You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/11/16 16:01:22 UTC

[1/3] incubator-s2graph git commit: [S2GRAPH-121]: Create `Result` class to hold traverse result edges.

Repository: incubator-s2graph
Updated Branches:
  refs/heads/master b5908311a -> 8dbb9a3ee


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala
index fda9991..6054d67 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala
@@ -20,27 +20,27 @@
 package org.apache.s2graph.core.Integrate
 
 import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.utils.logger
 import play.api.libs.json.{JsObject, Json}
 
 class CrudTest extends IntegrateCommon {
   import CrudHelper._
   import TestUtil._
 
-  test("test CRUD") {
-    var tcNum = 0
-    var tcString = ""
-    var bulkQueries = List.empty[(Long, String, String)]
-    var expected = Map.empty[String, String]
-
-    val curTime = System.currentTimeMillis
-    val t1 = curTime + 0
-    val t2 = curTime + 1
-    val t3 = curTime + 2
-    val t4 = curTime + 3
-    val t5 = curTime + 4
-
-    val tcRunner = new CrudTestRunner()
-    tcNum = 1
+  var tcString = ""
+  var bulkQueries = List.empty[(Long, String, String)]
+  var expected = Map.empty[String, String]
+
+  val curTime = System.currentTimeMillis
+  val t1 = curTime + 0
+  val t2 = curTime + 1
+  val t3 = curTime + 2
+  val t4 = curTime + 3
+  val t5 = curTime + 4
+
+  val tcRunner = new CrudTestRunner()
+  test("1: [t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test") {
+    val tcNum = 1
     tcString = "[t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test "
 
     bulkQueries = List(
@@ -50,8 +50,9 @@ class CrudTest extends IntegrateCommon {
     expected = Map("time" -> "10", "weight" -> "20")
 
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
-
-    tcNum = 2
+  }
+  test("2: [t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test") {
+    val tcNum = 2
     tcString = "[t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test "
     bulkQueries = List(
       (t1, "insert", "{\"time\": 10}"),
@@ -60,8 +61,9 @@ class CrudTest extends IntegrateCommon {
     expected = Map("time" -> "10", "weight" -> "20")
 
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
-
-    tcNum = 3
+  }
+  test("3: [t3 -> t2 -> t1 test case] insert(t3) delete(t2) insert(t1) test") {
+    val tcNum = 3
     tcString = "[t3 -> t2 -> t1 test case] insert(t3) delete(t2) insert(t1) test "
     bulkQueries = List(
       (t3, "insert", "{\"time\": 10, \"weight\": 20}"),
@@ -70,8 +72,9 @@ class CrudTest extends IntegrateCommon {
     expected = Map("time" -> "10", "weight" -> "20")
 
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
-
-    tcNum = 4
+  }
+  test("4: [t3 -> t1 -> t2 test case] insert(t3) insert(t1) delete(t2) test") {
+    val tcNum = 4
     tcString = "[t3 -> t1 -> t2 test case] insert(t3) insert(t1) delete(t2) test "
     bulkQueries = List(
       (t3, "insert", "{\"time\": 10, \"weight\": 20}"),
@@ -80,8 +83,9 @@ class CrudTest extends IntegrateCommon {
     expected = Map("time" -> "10", "weight" -> "20")
 
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
-
-    tcNum = 5
+  }
+  test("5: [t2 -> t1 -> t3 test case] delete(t2) insert(t1) insert(t3) test") {
+    val tcNum = 5
     tcString = "[t2 -> t1 -> t3 test case] delete(t2) insert(t1) insert(t3) test"
     bulkQueries = List(
       (t2, "delete", ""),
@@ -90,8 +94,9 @@ class CrudTest extends IntegrateCommon {
     expected = Map("time" -> "10", "weight" -> "20")
 
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
-
-    tcNum = 6
+  }
+  test("6: [t2 -> t3 -> t1 test case] delete(t2) insert(t3) insert(t1) test") {
+    val tcNum = 6
     tcString = "[t2 -> t3 -> t1 test case] delete(t2) insert(t3) insert(t1) test "
     bulkQueries = List(
       (t2, "delete", ""),
@@ -100,8 +105,9 @@ class CrudTest extends IntegrateCommon {
     expected = Map("time" -> "10", "weight" -> "20")
 
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
-
-    tcNum = 7
+  }
+  test("7: [t1 -> t2 -> t3 test case] update(t1) delete(t2) update(t3) test ") {
+    val tcNum = 7
     tcString = "[t1 -> t2 -> t3 test case] update(t1) delete(t2) update(t3) test "
     bulkQueries = List(
       (t1, "update", "{\"time\": 10}"),
@@ -110,7 +116,9 @@ class CrudTest extends IntegrateCommon {
     expected = Map("time" -> "10", "weight" -> "20")
 
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
-    tcNum = 8
+  }
+  test("8: [t1 -> t3 -> t2 test case] update(t1) update(t3) delete(t2) test ") {
+    val tcNum = 8
     tcString = "[t1 -> t3 -> t2 test case] update(t1) update(t3) delete(t2) test "
     bulkQueries = List(
       (t1, "update", "{\"time\": 10}"),
@@ -119,7 +127,9 @@ class CrudTest extends IntegrateCommon {
     expected = Map("time" -> "10", "weight" -> "20")
 
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
-    tcNum = 9
+  }
+  test("9: [t2 -> t1 -> t3 test case] delete(t2) update(t1) update(t3) test") {
+    val tcNum = 9
     tcString = "[t2 -> t1 -> t3 test case] delete(t2) update(t1) update(t3) test "
     bulkQueries = List(
       (t2, "delete", ""),
@@ -128,7 +138,9 @@ class CrudTest extends IntegrateCommon {
     expected = Map("time" -> "10", "weight" -> "20")
 
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
-    tcNum = 10
+  }
+  test("10: [t2 -> t3 -> t1 test case] delete(t2) update(t3) update(t1) test") {
+    val tcNum = 10
     tcString = "[t2 -> t3 -> t1 test case] delete(t2) update(t3) update(t1) test"
     bulkQueries = List(
       (t2, "delete", ""),
@@ -137,7 +149,9 @@ class CrudTest extends IntegrateCommon {
     expected = Map("time" -> "10", "weight" -> "20")
 
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
-    tcNum = 11
+  }
+  test("11: [t3 -> t2 -> t1 test case] update(t3) delete(t2) update(t1) test") {
+    val tcNum = 11
     tcString = "[t3 -> t2 -> t1 test case] update(t3) delete(t2) update(t1) test "
     bulkQueries = List(
       (t3, "update", "{\"time\": 10, \"weight\": 20}"),
@@ -146,7 +160,9 @@ class CrudTest extends IntegrateCommon {
     expected = Map("time" -> "10", "weight" -> "20")
 
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
-    tcNum = 12
+  }
+  test("12: [t3 -> t1 -> t2 test case] update(t3) update(t1) delete(t2) test") {
+    val tcNum = 12
     tcString = "[t3 -> t1 -> t2 test case] update(t3) update(t1) delete(t2) test "
     bulkQueries = List(
       (t3, "update", "{\"time\": 10, \"weight\": 20}"),
@@ -155,8 +171,9 @@ class CrudTest extends IntegrateCommon {
     expected = Map("time" -> "10", "weight" -> "20")
 
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
-
-    tcNum = 13
+  }
+  test("13: [t5 -> t1 -> t3 -> t2 -> t4 test case] update(t5) insert(t1) insert(t3) delete(t2) update(t4) test ") {
+    val tcNum = 13
     tcString = "[t5 -> t1 -> t3 -> t2 -> t4 test case] update(t5) insert(t1) insert(t3) delete(t2) update(t4) test "
     bulkQueries = List(
       (t5, "update", "{\"is_blocked\": true}"),
@@ -169,6 +186,15 @@ class CrudTest extends IntegrateCommon {
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
   }
 
+  test("14 - test lock expire") {
+    for {
+      labelName <- List(testLabelName, testLabelName2)
+    } {
+      val id = 0
+      tcRunner.expireTC(labelName, id)
+    }
+  }
+
 
   object CrudHelper {
 
@@ -191,7 +217,7 @@ class CrudTest extends IntegrateCommon {
           val bulkEdges = (for ((ts, op, props) <- opWithProps) yield {
             TestUtil.toEdge(ts, op, "e", srcId, tgtId, labelName, props)
           })
-
+          println(s"${bulkEdges.mkString("\n")}")
           TestUtil.insertEdgesSync(bulkEdges: _*)
 
           for {
@@ -210,6 +236,7 @@ class CrudTest extends IntegrateCommon {
             val jsResult = TestUtil.getEdgesSync(query)
 
             val results = jsResult \ "results"
+
             val deegrees = (jsResult \ "degrees").as[List[JsObject]]
             val propsLs = (results \\ "props").seq
             (deegrees.head \ LabelMeta.degree.name).as[Int] should be(1)
@@ -229,6 +256,63 @@ class CrudTest extends IntegrateCommon {
         }
       }
 
+      def expireTC(labelName: String, id: Int) = {
+        var i = 1
+        val label = Label.findByName(labelName).get
+        val serviceName = label.serviceName
+        val columnName = label.srcColumnName
+        val id = 0
+
+        while (i < 1000) {
+          val bulkEdges = Seq(TestUtil.toEdge(i, "u", "e", id, id, testLabelName, Json.obj("time" -> 10).toString()))
+          val rets = TestUtil.insertEdgesSync(bulkEdges: _*)
+
+
+          val queryJson = querySnapshotEdgeJson(serviceName, columnName, labelName, id)
+
+          if (!rets.forall(identity)) {
+            Thread.sleep(graph.storage.LockExpireDuration + 100)
+            /** expect current request would be ignored */
+            val bulkEdges = Seq(TestUtil.toEdge(i-1, "u", "e", 0, 0, testLabelName, Json.obj("time" -> 20).toString()))
+            val rets = TestUtil.insertEdgesSync(bulkEdges: _*)
+            if (rets.forall(identity)) {
+              // check
+              val jsResult = TestUtil.getEdgesSync(queryJson)
+              (jsResult \\ "time").head.as[Int] should be(10)
+              logger.debug(jsResult)
+              i = 100000
+            }
+          }
+
+          i += 1
+        }
+
+        i = 1
+        while (i < 1000) {
+          val bulkEdges = Seq(TestUtil.toEdge(i, "u", "e", id, id, testLabelName, Json.obj("time" -> 10).toString()))
+          val rets = TestUtil.insertEdgesSync(bulkEdges: _*)
+
+
+          val queryJson = querySnapshotEdgeJson(serviceName, columnName, labelName, id)
+
+          if (!rets.forall(identity)) {
+            Thread.sleep(graph.storage.LockExpireDuration + 100)
+            /** expect current request would be applied */
+            val bulkEdges = Seq(TestUtil.toEdge(i+1, "u", "e", 0, 0, testLabelName, Json.obj("time" -> 20).toString()))
+            val rets = TestUtil.insertEdgesSync(bulkEdges: _*)
+            if (rets.forall(identity)) {
+              // check
+              val jsResult = TestUtil.getEdgesSync(queryJson)
+              (jsResult \\ "time").head.as[Int] should be(20)
+              logger.debug(jsResult)
+              i = 100000
+            }
+          }
+
+          i += 1
+        }
+      }
+
       def queryJson(serviceName: String, columnName: String, labelName: String, id: String, dir: String, cacheTTL: Long = -1L) = Json.parse(
         s""" { "srcVertices": [
              { "serviceName": "$serviceName",
@@ -240,6 +324,15 @@ class CrudTest extends IntegrateCommon {
              "offset": 0,
              "limit": 10,
              "cacheTTL": $cacheTTL }]]}""")
+
+      def querySnapshotEdgeJson(serviceName: String, columnName: String, labelName: String, id: Int) = Json.parse(
+        s""" { "srcVertices": [
+             { "serviceName": "$serviceName",
+               "columnName": "$columnName",
+               "id": $id } ],
+             "steps": [ [ {
+             "label": "$labelName",
+             "_to": $id }]]}""")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
index 225d396..b341ec5 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
@@ -43,7 +43,7 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll {
     config = ConfigFactory.load()
     graph = new Graph(config)(ExecutionContext.Implicits.global)
     management = new Management(graph)
-    parser = new RequestParser(graph.config)
+    parser = new RequestParser(graph)
     initTestData()
   }
 
@@ -120,7 +120,8 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll {
     def deleteAllSync(jsValue: JsValue) = {
       val future = Future.sequence(jsValue.as[Seq[JsValue]] map { json =>
         val (labels, direction, ids, ts, vertices) = parser.toDeleteParam(json)
-        val future = graph.deleteAllAdjacentEdges(vertices.toList, labels, GraphUtil.directions(direction), ts)
+        val srcVertices = vertices
+        val future = graph.deleteAllAdjacentEdges(srcVertices.toList, labels, GraphUtil.directions(direction), ts)
 
         future
       })
@@ -131,10 +132,13 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll {
     def getEdgesSync(queryJson: JsValue): JsValue = {
       logger.info(Json.prettyPrint(queryJson))
       val restHandler = new RestHandler(graph)
-      Await.result(restHandler.getEdgesAsync(queryJson)(PostProcess.toSimpleVertexArrJson), HttpRequestWaitingTime)
+      val result = Await.result(restHandler.getEdgesAsync(queryJson)(PostProcess.toJson), HttpRequestWaitingTime)
+      logger.debug(s"${Json.prettyPrint(result)}")
+      result
     }
 
     def insertEdgesSync(bulkEdges: String*) = {
+      logger.debug(s"${bulkEdges.mkString("\n")}")
       val req = graph.mutateElements(parser.toGraphElements(bulkEdges.mkString("\n")), withWait = true)
       val jsResult = Await.result(req, HttpRequestWaitingTime)
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
index 9c52b32..54bb12c 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
@@ -307,7 +307,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
               "label": "$testLabelName",
               "direction": "in",
               "offset": 0,
-              "limit": 10
+              "limit": 1000
             }
           ]]
         }""".stripMargin)
@@ -328,54 +328,54 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
 
 
 
-//  test("pagination and _to") {
-//    def querySingleWithTo(id: Int, offset: Int = 0, limit: Int = 100, to: Int) = Json.parse(
-//      s"""
-//        { "srcVertices": [
-//          { "serviceName": "${testServiceName}",
-//            "columnName": "${testColumnName}",
-//            "id": ${id}
-//           }],
-//          "steps": [
-//          [ {
-//              "label": "${testLabelName}",
-//              "direction": "out",
-//              "offset": $offset,
-//              "limit": $limit,
-//              "_to": $to
-//            }
-//          ]]
-//        }
-//        """)
-//
-//    val src = System.currentTimeMillis().toInt
-//
-//    val bulkEdges = Seq(
-//      toEdge(1001, insert, e, src, 1, testLabelName, Json.obj(weight -> 10, is_hidden -> true)),
-//      toEdge(2002, insert, e, src, 2, testLabelName, Json.obj(weight -> 20, is_hidden -> false)),
-//      toEdge(3003, insert, e, src, 3, testLabelName, Json.obj(weight -> 30)),
-//      toEdge(4004, insert, e, src, 4, testLabelName, Json.obj(weight -> 40))
-//    )
-//    insertEdgesSync(bulkEdges: _*)
-//
-//    var result = getEdgesSync(querySingle(src, offset = 0, limit = 2))
-//    var edges = (result \ "results").as[List[JsValue]]
-//
-//    edges.size should be(2)
-//    (edges(0) \ "to").as[Long] should be(4)
-//    (edges(1) \ "to").as[Long] should be(3)
-//
-//    result = getEdgesSync(querySingle(src, offset = 1, limit = 2))
-//
-//    edges = (result \ "results").as[List[JsValue]]
-//    edges.size should be(2)
-//    (edges(0) \ "to").as[Long] should be(3)
-//    (edges(1) \ "to").as[Long] should be(2)
-//
-//    result = getEdgesSync(querySingleWithTo(src, offset = 0, limit = -1, to = 1))
-//    edges = (result \ "results").as[List[JsValue]]
-//    edges.size should be(1)
-//  }
+  test("pagination and _to") {
+    def querySingleWithTo(id: Int, offset: Int = 0, limit: Int = 100, to: Int) = Json.parse(
+      s"""
+        { "srcVertices": [
+          { "serviceName": "${testServiceName}",
+            "columnName": "${testColumnName}",
+            "id": ${id}
+           }],
+          "steps": [
+          [ {
+              "label": "${testLabelName}",
+              "direction": "out",
+              "offset": $offset,
+              "limit": $limit,
+              "_to": $to
+            }
+          ]]
+        }
+        """)
+
+    val src = System.currentTimeMillis().toInt
+
+    val bulkEdges = Seq(
+      toEdge(1001, insert, e, src, 1, testLabelName, Json.obj(weight -> 10, is_hidden -> true)),
+      toEdge(2002, insert, e, src, 2, testLabelName, Json.obj(weight -> 20, is_hidden -> false)),
+      toEdge(3003, insert, e, src, 3, testLabelName, Json.obj(weight -> 30)),
+      toEdge(4004, insert, e, src, 4, testLabelName, Json.obj(weight -> 40))
+    )
+    insertEdgesSync(bulkEdges: _*)
+
+    var result = getEdgesSync(querySingle(src, offset = 0, limit = 2))
+    var edges = (result \ "results").as[List[JsValue]]
+
+    edges.size should be(2)
+    (edges(0) \ "to").as[Long] should be(4)
+    (edges(1) \ "to").as[Long] should be(3)
+
+    result = getEdgesSync(querySingle(src, offset = 1, limit = 2))
+
+    edges = (result \ "results").as[List[JsValue]]
+    edges.size should be(2)
+    (edges(0) \ "to").as[Long] should be(3)
+    (edges(1) \ "to").as[Long] should be(2)
+
+    result = getEdgesSync(querySingleWithTo(src, offset = 0, limit = -1, to = 1))
+    edges = (result \ "results").as[List[JsValue]]
+    edges.size should be(1)
+  }
 
   test("order by") {
     def queryScore(id: Int, scoring: Map[String, Int]): JsValue = Json.obj(
@@ -907,7 +907,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
        """.stripMargin
     )
 
-    def querySingleVertexWithOp(id: String, op: String, shrinkageVal: Long) = Json.parse(
+    def queryWithOp(ids: Seq[String], op: String, shrinkageVal: Long) = Json.parse(
       s"""{
          |  "limit": 10,
          |  "groupBy": ["from"],
@@ -916,7 +916,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
          |    {
          |      "serviceName": "$testServiceName",
          |      "columnName": "$testColumnName",
-         |      "id": $id
+         |      "ids": [${ids.mkString(",")}]
          |    }
          |  ],
          |  "steps": [
@@ -949,47 +949,6 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
        """.stripMargin
     )
 
-    def queryMultiVerticesWithOp(id: String, id2: String, op: String, shrinkageVal: Long) = Json.parse(
-      s"""{
-         |  "limit": 10,
-         |  "groupBy": ["from"],
-         |  "duplicate": "sum",
-         |  "srcVertices": [
-         |    {
-         |      "serviceName": "$testServiceName",
-         |      "columnName": "$testColumnName",
-         |      "ids": [$id, $id2]
-         |    }
-         |  ],
-         |  "steps": [
-         |    {
-         |      "step": [
-         |        {
-         |          "label": "$testLabelName",
-         |          "direction": "out",
-         |          "offset": 0,
-         |          "limit": 10,
-         |          "groupBy": ["from"],
-         |          "duplicate": "countSum",
-         |          "transform": [["_from"]]
-         |        }
-         |      ]
-         |    }, {
-         |      "step": [
-         |        {
-         |          "label": "$testLabelName2",
-         |          "direction": "out",
-         |          "offset": 0,
-         |          "limit": 10,
-         |          "scorePropagateOp": "$op",
-         |          "scorePropagateShrinkage": $shrinkageVal
-         |        }
-         |      ]
-         |    }
-         |  ]
-         |}
-       """.stripMargin
-    )
     val testId = "-30000"
     val testId2 = "-4000"
 
@@ -1014,15 +973,15 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
     val secondStepEdgeCount = 4l
 
     var shrinkageVal = 10l
-    var rs = getEdgesSync(querySingleVertexWithOp(testId, "divide", shrinkageVal))
-    logger.debug(Json.prettyPrint(rs))
+    var rs = getEdgesSync(queryWithOp(Seq(testId), "divide", shrinkageVal))
+
     var results = (rs \ "results").as[List[JsValue]]
     results.size should be(1)
     var scoreSum = secondStepEdgeCount.toDouble / (firstStepEdgeCount.toDouble + shrinkageVal)
     (results(0) \ "scoreSum").as[Double] should be(scoreSum)
 
-    rs = getEdgesSync(queryMultiVerticesWithOp(testId, testId2, "divide", shrinkageVal))
-    logger.debug(Json.prettyPrint(rs))
+    rs = getEdgesSync(queryWithOp(Seq(testId, testId2), "divide", shrinkageVal))
+
     results = (rs \ "results").as[List[JsValue]]
     results.size should be(2)
     scoreSum = secondStepEdgeCount.toDouble / (firstStepEdgeCount.toDouble + shrinkageVal)
@@ -1033,21 +992,21 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
     // check for divide zero case
     shrinkageVal = 30l
     rs = getEdgesSync(queryWithPropertyOp(testId, "divide", shrinkageVal))
-    logger.debug(Json.prettyPrint(rs))
+
     results = (rs \ "results").as[List[JsValue]]
     results.size should be(1)
     (results(0) \ "scoreSum").as[Double] should be(0)
 
     // "plus" operation
-    rs = getEdgesSync(querySingleVertexWithOp(testId, "plus", shrinkageVal))
-    logger.debug(Json.prettyPrint(rs))
+    rs = getEdgesSync(queryWithOp(Seq(testId), "plus", shrinkageVal))
+
     results = (rs \ "results").as[List[JsValue]]
     results.size should be(1)
     scoreSum = (firstStepEdgeCount + 1) * secondStepEdgeCount
     (results(0) \ "scoreSum").as[Long] should be(scoreSum)
 
     // "multiply" operation
-    rs = getEdgesSync(querySingleVertexWithOp(testId, "multiply", shrinkageVal))
+    rs = getEdgesSync(queryWithOp(Seq(testId), "multiply", shrinkageVal))
     logger.debug(Json.prettyPrint(rs))
     results = (rs \ "results").as[List[JsValue]]
     results.size should be(1)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala
index 99b56f7..6092fe4 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala
@@ -141,11 +141,12 @@ class StrongLabelDeleteTest extends IntegrateCommon {
   test("large degrees") {
     val labelName = testLabelName2
     val dir = "out"
+    val minSize = 0
     val maxSize = 100
     val deleteSize = 10
     val numOfConcurrentBatch = 100
-    val src = System.currentTimeMillis()
-    val tgts = (0 until maxSize).map { ith => src + ith }
+    val src = 1092983
+    val tgts = (minSize until maxSize).map { ith => src + ith }
     val deleteTgts = Random.shuffle(tgts).take(deleteSize)
     val insertRequests = tgts.map { tgt =>
       Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t")
@@ -181,11 +182,12 @@ class StrongLabelDeleteTest extends IntegrateCommon {
   test("deleteAll") {
     val labelName = testLabelName2
     val dir = "out"
-    val maxSize = 100
+    val minSize = 200
+    val maxSize = 300
     val deleteSize = 10
     val numOfConcurrentBatch = 100
-    val src = System.currentTimeMillis()
-    val tgts = (0 until maxSize).map { ith => src + ith }
+    val src = 192338237
+    val tgts = (minSize until maxSize).map { ith => src + ith }
     val deleteTgts = Random.shuffle(tgts).take(deleteSize)
     val insertRequests = tgts.map { tgt =>
       Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t")
@@ -220,7 +222,7 @@ class StrongLabelDeleteTest extends IntegrateCommon {
 //    val labelName = testLabelName
     val maxTgtId = 10
     val batchSize = 10
-    val testNum = 100
+    val testNum = 10
     val numOfBatch = 10
 
     def testInner(startTs: Long, src: Long) = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala
index a1bff68..603ca12 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala
@@ -31,6 +31,8 @@ class VertexTestHelper extends IntegrateCommon {
   import TestUtil._
   import VertexTestHelper._
 
+
+
   test("vertex") {
     val ids = (7 until 20).map(tcNum => tcNum * 1000 + 0)
     val (serviceName, columnName) = (testServiceName, testColumnName)
@@ -40,9 +42,10 @@ class VertexTestHelper extends IntegrateCommon {
     println(payload)
 
     val vertices = parser.toVertices(payload, "insert", Option(serviceName), Option(columnName))
-    Await.result(graph.mutateVertices(vertices, withWait = true), HttpRequestWaitingTime)
+    val srcVertices = vertices
+    Await.result(graph.mutateVertices(srcVertices, withWait = true), HttpRequestWaitingTime)
 
-    val res = graph.getVertices(vertices).map { vertices =>
+    val res = graph.getVertices(srcVertices).map { vertices =>
       PostProcess.verticesToJson(vertices)
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
index 3f76d59..d62dee8 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -33,11 +33,12 @@ class WeakLabelDeleteTest extends IntegrateCommon with BeforeAndAfterEach {
   import WeakLabelDeleteHelper._
 
   test("test weak consistency select") {
+    insertEdgesSync(bulkEdges(): _*)
     var result = getEdgesSync(query(0))
-    println(result)
+
     (result \ "results").as[List[JsValue]].size should be(4)
     result = getEdgesSync(query(10))
-    println(result)
+
     (result \ "results").as[List[JsValue]].size should be(2)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala
index 419e9c4..bab6e03 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala
@@ -19,10 +19,11 @@
 
 package org.apache.s2graph.core
 
-import org.apache.s2graph.core.types.{InnerVal, InnerValLike}
 import org.apache.s2graph.core.JSONParser._
+import org.apache.s2graph.core.types.{InnerVal, InnerValLike}
 import org.scalatest.{FunSuite, Matchers}
 
+
 class JsonParserTest extends FunSuite with Matchers with TestCommon {
 
   import InnerVal._

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala
index 06af38c..61d1096 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.types.LabelWithDirection
+import org.apache.s2graph.core.types.{InnerVal, InnerValLike, HBaseSerializable, LabelWithDirection}
 import org.scalatest.{FunSuite, Matchers}
 
 class QueryParamTest extends FunSuite with Matchers with TestCommon {
@@ -101,5 +101,59 @@ class QueryParamTest extends FunSuite with Matchers with TestCommon {
 
     println(s">> diff: $duration")
   }
+  test("QueryParam interval min/max bytes padding test") {
+    import HBaseSerializable._
+    val queryParam = QueryParam.Empty
+    def compare(_from: Seq[InnerValLike], _to: Seq[InnerValLike], _value: Seq[InnerValLike]): Boolean = {
+      val len = _from.length.toByte
 
+      val from = _from.zipWithIndex map { case (innerVal: InnerValLike, idx: Int) => idx.toByte -> innerVal }
+      val to = _to.zipWithIndex map { case (innerVal: InnerValLike, idx: Int) => idx.toByte -> innerVal }
+      val value = _value.zipWithIndex map { case (innerVal: InnerValLike, idx: Int) => idx.toByte -> innerVal }
+
+      val (fromBytes, toBytes) = queryParam.paddingInterval(len, from, to)
+      val valueBytes = propsToBytes(value)
+
+      val validFrom = Bytes.compareTo(fromBytes, valueBytes) <= 0
+      val validTo = Bytes.compareTo(toBytes, valueBytes) >= 0
+
+      val res = validFrom && validTo
+      //      if (!res) logger.error(s"from: $validFrom, to: $validTo, from: ${_from} to: ${_to} value: ${_value}")
+      res
+    }
+
+    val v = "v3"
+    compare(
+      Seq(InnerVal.withLong(0L, v)),
+      Seq(InnerVal.withLong(0L, v)),
+      Seq(InnerVal.withLong(0L, v))) shouldBe true
+
+    compare(
+      Seq(InnerVal.withLong(0L, v)),
+      Seq(InnerVal.withLong(0L, v)),
+      Seq(InnerVal.withLong(1L, v))) shouldBe false
+
+    compare(
+      Seq(InnerVal.withLong(1L, v)),
+      Seq(InnerVal.withLong(1L, v)),
+      Seq(InnerVal.withLong(0L, v))) shouldBe false
+
+    compare(
+      Seq(InnerVal.withLong(0L, v)),
+      Seq(InnerVal.withLong(1L, v)),
+      Seq(InnerVal.withLong(2L, v))) shouldBe false
+
+    val testNum = 100000
+    val tests = for {
+      n <- 0 to testNum
+      min = scala.util.Random.nextInt(Int.MaxValue / 2) + 1
+      max = min + scala.util.Random.nextInt(min)
+      value = min + scala.util.Random.nextInt(max - min + 1)
+    } yield compare(
+        Seq(InnerVal.withLong(min, v)),
+        Seq(InnerVal.withLong(max, v)),
+        Seq(InnerVal.withLong(value, v)))
+
+    tests.forall(identity) shouldBe true
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
index 33a901d..584a641 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
@@ -146,7 +146,7 @@ trait TestCommonWithModels {
       isDirected = true, serviceNameV4, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION4, false, "lg4", None)
 
     management.createLabel(undirectedLabelName, serviceName, columnName, columnType, serviceName, tgtColumnName, tgtColumnType,
-      isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4", None)
+      isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION3, false, "lg4", None)
 
     management.createLabel(undirectedLabelNameV2, serviceNameV2, columnNameV2, columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2,
       isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4", None)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/benchmark/JsonBenchmarkSpec.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/JsonBenchmarkSpec.scala b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/JsonBenchmarkSpec.scala
index 8ba9ea2..05dcd30 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/JsonBenchmarkSpec.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/JsonBenchmarkSpec.scala
@@ -23,41 +23,55 @@ import play.api.libs.json.JsNumber
 import play.libs.Json
 
 class JsonBenchmarkSpec extends BenchmarkCommon {
-  "to json" >> {
-    "json benchmark" >> {
 
-      duration("map to json") {
-        (0 to 10) foreach { n =>
-          val numberMaps = (0 to 100).map { n => (n.toString -> JsNumber(n * n)) }.toMap
-          Json.toJson(numberMaps)
-        }
+  "JsonBenchSpec" should {
+
+    "Json Append" >> {
+      import play.api.libs.json.{Json, _}
+      val numberJson = Json.toJson((0 to 1000).map { i => s"$i" -> JsNumber(i * i) }.toMap).as[JsObject]
+
+      /** dummy warm-up **/
+      (0 to 10000) foreach { n =>
+        Json.obj(s"$n" -> "dummy") ++ numberJson
+      }
+      (0 to 10000) foreach { n =>
+        Json.obj(s"$n" -> numberJson)
       }
 
-      duration("directMakeJson") {
-        (0 to 10) foreach { n =>
-          var jsObj = play.api.libs.json.Json.obj()
-          (0 to 10).foreach { n =>
-            jsObj += (n.toString -> JsNumber(n * n))
-          }
+      duration("Append by JsObj ++ JsObj ") {
+        (0 to 100000) foreach { n =>
+          numberJson ++ Json.obj(s"$n" -> "dummy")
         }
       }
 
-      duration("map to json 2") {
-        (0 to 50) foreach { n =>
-          val numberMaps = (0 to 10).map { n => (n.toString -> JsNumber(n * n)) }.toMap
-          Json.toJson(numberMaps)
+      duration("Append by Json.obj(newJson -> JsObj)") {
+        (0 to 100000) foreach { n =>
+          Json.obj(s"$n" -> numberJson)
         }
       }
+      true
+    }
+  }
 
-      duration("directMakeJson 2") {
-        (0 to 50) foreach { n =>
-          var jsObj = play.api.libs.json.Json.obj()
-          (0 to 10).foreach { n =>
-            jsObj += (n.toString -> JsNumber(n * n))
-          }
+  "Make Json" >> {
+    duration("map to json") {
+      (0 to 10000) foreach { n =>
+        val numberMaps = (0 to 100).map { n =>
+          n.toString -> JsNumber(n * n)
+        }.toMap
+
+        Json.toJson(numberMaps)
+      }
+    }
+
+    duration("direct") {
+      (0 to 10000) foreach { n =>
+        var jsObj = play.api.libs.json.Json.obj()
+
+        (0 to 100).foreach { n =>
+          jsObj += (n.toString -> JsNumber(n * n))
         }
       }
-      true
     }
     true
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/benchmark/SamplingBenchmarkSpec.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/SamplingBenchmarkSpec.scala b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/SamplingBenchmarkSpec.scala
index a8777fb..ec19641 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/SamplingBenchmarkSpec.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/SamplingBenchmarkSpec.scala
@@ -1,102 +1,105 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.benchmark
-
-import scala.annotation.tailrec
-import scala.util.Random
-
-class SamplingBenchmarkSpec extends BenchmarkCommon {
-  "sample" should {
-
-    "sample benchmark" in {
-      @tailrec
-      def randomInt(n: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = {
-        if (set.size == n) set
-        else randomInt(n, range, set + Random.nextInt(range))
-      }
-
-      // sample using random array
-      def randomArraySample[T](num: Int, ls: List[T]): List[T] = {
-        val randomNum = randomInt(num, ls.size)
-        var sample = List.empty[T]
-        var idx = 0
-        ls.foreach { e =>
-          if (randomNum.contains(idx)) sample = e :: sample
-          idx += 1
-        }
-        sample
-      }
-
-      // sample using shuffle
-      def shuffleSample[T](num: Int, ls: List[T]): List[T] = {
-        Random.shuffle(ls).take(num)
-      }
-
-      // sample using random number generation
-      def rngSample[T](num: Int, ls: List[T]): List[T] = {
-        var sampled = List.empty[T]
-        val N = ls.size // population
-        var t = 0 // total input records dealt with
-        var m = 0 // number of items selected so far
-
-        while (m < num) {
-          val u = Random.nextDouble()
-          if ((N - t) * u < num - m) {
-            sampled = ls(t) :: sampled
-            m += 1
-          }
-          t += 1
-        }
-        sampled
-      }
-
-      // test data
-      val testLimit = 10000
-      val testNum = 10
-      val testData = (0 to 1000).toList
-
-      // dummy for warm-up
-      (0 to testLimit) foreach { n =>
-        randomArraySample(testNum, testData)
-        shuffleSample(testNum, testData)
-        rngSample(testNum, testData)
-      }
-
-      duration("Random Array Sampling") {
-        (0 to testLimit) foreach { _ =>
-          val sampled = randomArraySample(testNum, testData)
-        }
-      }
-
-      duration("Shuffle Sampling") {
-        (0 to testLimit) foreach { _ =>
-          val sampled = shuffleSample(testNum, testData)
-        }
-      }
-
-      duration("RNG Sampling") {
-        (0 to testLimit) foreach { _ =>
-          val sampled = rngSample(testNum, testData)
-        }
-      }
-      true
-    }
-  }
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *   http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied.  See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// */
+//
+//package org.apache.s2graph.rest.play.benchmark
+//
+//import play.api.test.{FakeApplication, PlaySpecification, WithApplication}
+//
+//import scala.annotation.tailrec
+//import scala.util.Random
+//
+//class SamplingBenchmarkSpec extends BenchmarkCommon with PlaySpecification {
+//  "sample" should {
+//    implicit val app = FakeApplication()
+//
+//    "sample benchmark" in new WithApplication(app) {
+//      @tailrec
+//      def randomInt(n: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = {
+//        if (set.size == n) set
+//        else randomInt(n, range, set + Random.nextInt(range))
+//      }
+//
+//      // sample using random array
+//      def randomArraySample[T](num: Int, ls: List[T]): List[T] = {
+//        val randomNum = randomInt(num, ls.size)
+//        var sample = List.empty[T]
+//        var idx = 0
+//        ls.foreach { e =>
+//          if (randomNum.contains(idx)) sample = e :: sample
+//          idx += 1
+//        }
+//        sample
+//      }
+//
+//      // sample using shuffle
+//      def shuffleSample[T](num: Int, ls: List[T]): List[T] = {
+//        Random.shuffle(ls).take(num)
+//      }
+//
+//      // sample using random number generation
+//      def rngSample[T](num: Int, ls: List[T]): List[T] = {
+//        var sampled = List.empty[T]
+//        val N = ls.size // population
+//        var t = 0 // total input records dealt with
+//        var m = 0 // number of items selected so far
+//
+//        while (m < num) {
+//          val u = Random.nextDouble()
+//          if ((N - t) * u < num - m) {
+//            sampled = ls(t) :: sampled
+//            m += 1
+//          }
+//          t += 1
+//        }
+//        sampled
+//      }
+//
+//      // test data
+//      val testLimit = 10000
+//      val testNum = 10
+//      val testData = (0 to 1000).toList
+//
+//      // dummy for warm-up
+//      (0 to testLimit) foreach { n =>
+//        randomArraySample(testNum, testData)
+//        shuffleSample(testNum, testData)
+//        rngSample(testNum, testData)
+//      }
+//
+//      duration("Random Array Sampling") {
+//        (0 to testLimit) foreach { _ =>
+//          val sampled = randomArraySample(testNum, testData)
+//        }
+//      }
+//
+//      duration("Shuffle Sampling") {
+//        (0 to testLimit) foreach { _ =>
+//          val sampled = shuffleSample(testNum, testData)
+//        }
+//      }
+//
+//      duration("RNG Sampling") {
+//        (0 to testLimit) foreach { _ =>
+//          val sampled = rngSample(testNum, testData)
+//        }
+//      }
+//      true
+//    }
+//  }
+//}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
----------------------------------------------------------------------
diff --git a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
index a654a83..c8f65bf 100644
--- a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
+++ b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
@@ -25,7 +25,7 @@ import com.typesafe.config.ConfigFactory
 import io.netty.bootstrap.ServerBootstrap
 import io.netty.buffer.{ByteBuf, Unpooled}
 import io.netty.channel._
-import io.netty.channel.epoll.{EpollServerSocketChannel, EpollEventLoopGroup}
+import io.netty.channel.epoll.{EpollEventLoopGroup, EpollServerSocketChannel}
 import io.netty.channel.nio.NioEventLoopGroup
 import io.netty.channel.socket.SocketChannel
 import io.netty.channel.socket.nio.NioServerSocketChannel
@@ -36,14 +36,14 @@ import io.netty.util.CharsetUtil
 import org.apache.s2graph.core.GraphExceptions.BadQueryException
 import org.apache.s2graph.core.mysqls.Experiment
 import org.apache.s2graph.core.rest.RestHandler
-import org.apache.s2graph.core.rest.RestHandler.HandlerResult
+import org.apache.s2graph.core.rest.RestHandler.{CanLookup, HandlerResult}
 import org.apache.s2graph.core.utils.Extensions._
 import org.apache.s2graph.core.utils.logger
 import org.apache.s2graph.core.{Graph, JSONParser, PostProcess}
 import play.api.libs.json._
 
 import scala.collection.mutable
-import scala.concurrent.ExecutionContext
+import scala.concurrent.{ExecutionContext, Future}
 import scala.io.Source
 import scala.util.{Failure, Success, Try}
 import scala.language.existentials
@@ -58,6 +58,10 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends
   val NotFound = HttpResponseStatus.NOT_FOUND
   val InternalServerError = HttpResponseStatus.INTERNAL_SERVER_ERROR
 
+  implicit val nettyHeadersLookup = new CanLookup[HttpHeaders] {
+    override def lookup(m: HttpHeaders, key: String) = Option(m.get(key))
+  }
+
   def badRoute(ctx: ChannelHandlerContext) =
     simpleResponse(ctx, BadGateway, byteBufOpt = None, channelFutureListenerOpt = CloseOpt)
 
@@ -82,7 +86,7 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends
     }
   }
 
-  def toResponse(ctx: ChannelHandlerContext, req: FullHttpRequest, requestBody: JsValue, result: HandlerResult, startedAt: Long) = {
+  def toResponse(ctx: ChannelHandlerContext, req: FullHttpRequest, requestBody: String, result: HandlerResult, startedAt: Long) = {
     var closeOpt = CloseOpt
     var headers = mutable.ArrayBuilder.make[(String, String)]
 
@@ -119,43 +123,68 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends
     }
   }
 
+  private def healthCheck(ctx: ChannelHandlerContext)(predicate: Boolean): Unit = {
+    if (predicate) {
+      val healthCheckMsg = Unpooled.copiedBuffer(NettyServer.deployInfo, CharsetUtil.UTF_8)
+      simpleResponse(ctx, Ok, byteBufOpt = Option(healthCheckMsg), channelFutureListenerOpt = CloseOpt)
+    } else {
+      simpleResponse(ctx, NotFound, channelFutureListenerOpt = CloseOpt)
+    }
+  }
+
+  private def updateHealthCheck(ctx: ChannelHandlerContext)(newValue: Boolean)(updateOp: Boolean => Unit): Unit = {
+    updateOp(newValue)
+    val newHealthCheckMsg = Unpooled.copiedBuffer(newValue.toString, CharsetUtil.UTF_8)
+    simpleResponse(ctx, Ok, byteBufOpt = Option(newHealthCheckMsg), channelFutureListenerOpt = CloseOpt)
+  }
+
   override def channelRead0(ctx: ChannelHandlerContext, req: FullHttpRequest): Unit = {
     val uri = req.getUri
     val startedAt = System.currentTimeMillis()
-
+    val checkFunc = healthCheck(ctx) _
+    val updateFunc = updateHealthCheck(ctx) _
     req.getMethod match {
       case HttpMethod.GET =>
         uri match {
-          case "/health_check.html" =>
-            if (NettyServer.isHealthy) {
-              val healthCheckMsg = Unpooled.copiedBuffer(NettyServer.deployInfo, CharsetUtil.UTF_8)
-              simpleResponse(ctx, Ok, byteBufOpt = Option(healthCheckMsg), channelFutureListenerOpt = CloseOpt)
+          case "/health_check.html" => checkFunc(NettyServer.isHealthy)
+          case "/fallback_check.html" => checkFunc(NettyServer.isFallbackHealthy)
+          case "/query_fallback_check.html" => checkFunc(NettyServer.isQueryFallbackHealthy)
+          case s if s.startsWith("/graphs/getEdge/") =>
+            if (!NettyServer.isQueryFallbackHealthy) {
+              val result = HandlerResult(body = Future.successful(PostProcess.emptyResults))
+              toResponse(ctx, req, s, result, startedAt)
             } else {
-              simpleResponse(ctx, NotFound, channelFutureListenerOpt = CloseOpt)
+              val Array(srcId, tgtId, labelName, direction) = s.split("/").takeRight(4)
+              val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId))
+              val result = s2rest.checkEdges(params)
+              toResponse(ctx, req, s, result, startedAt)
             }
-
-          case s if s.startsWith("/graphs/getEdge/") =>
-            // src, tgt, label, dir
-            val Array(srcId, tgtId, labelName, direction) = s.split("/").takeRight(4)
-            val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId))
-            val result = s2rest.checkEdges(params)
-            toResponse(ctx, req, params, result, startedAt)
           case _ => badRoute(ctx)
         }
 
       case HttpMethod.PUT =>
         if (uri.startsWith("/health_check/")) {
-          val newHealthCheck = uri.split("/").last.toBoolean
-          NettyServer.isHealthy = newHealthCheck
-          val newHealthCheckMsg = Unpooled.copiedBuffer(NettyServer.isHealthy.toString, CharsetUtil.UTF_8)
-          simpleResponse(ctx, Ok, byteBufOpt = Option(newHealthCheckMsg), channelFutureListenerOpt = CloseOpt)
-        } else badRoute(ctx)
+          val newValue = uri.split("/").last.toBoolean
+          updateFunc(newValue) { v => NettyServer.isHealthy = v }
+        } else if (uri.startsWith("/query_fallback_check/")) {
+          val newValue = uri.split("/").last.toBoolean
+          updateFunc(newValue) { v => NettyServer.isQueryFallbackHealthy = v }
+        } else if (uri.startsWith("/fallback_check/")) {
+          val newValue = uri.split("/").last.toBoolean
+          updateFunc(newValue) { v => NettyServer.isFallbackHealthy = v }
+        } else {
+          badRoute(ctx)
+        }
 
       case HttpMethod.POST =>
         val body = req.content.toString(CharsetUtil.UTF_8)
-
-        val result = s2rest.doPost(uri, body, Option(req.headers().get(Experiment.impressionKey)))
-        toResponse(ctx, req, Json.parse(body), result, startedAt)
+        if (!NettyServer.isQueryFallbackHealthy) {
+          val result = HandlerResult(body = Future.successful(PostProcess.emptyResults))
+          toResponse(ctx, req, body, result, startedAt)
+        } else {
+          val result = s2rest.doPost(uri, body, req.headers())
+          toResponse(ctx, req, body, result, startedAt)
+        }
 
       case _ =>
         simpleResponse(ctx, BadRequest, byteBufOpt = None, channelFutureListenerOpt = CloseOpt)
@@ -163,9 +192,14 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends
   }
 
   override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
-    cause.printStackTrace()
-    logger.error(s"exception on query.", cause)
-    simpleResponse(ctx, BadRequest, byteBufOpt = None, channelFutureListenerOpt = CloseOpt)
+    cause match {
+      case e: java.io.IOException =>
+        ctx.channel().close().addListener(CloseOpt.get)
+      case _ =>
+        cause.printStackTrace()
+        logger.error(s"exception on query.", cause)
+        simpleResponse(ctx, BadRequest, byteBufOpt = None, channelFutureListenerOpt = CloseOpt)
+    }
   }
 }
 
@@ -178,6 +212,7 @@ object NettyServer {
   val config = ConfigFactory.load()
   val port = Try(config.getInt("http.port")).recover { case _ => 9000 }.get
   val transport = Try(config.getString("netty.transport")).recover { case _ => "jdk" }.get
+  val maxBodySize = Try(config.getInt("max.body.size")).recover { case _ => 65536 * 2 }.get
 
   // init s2graph with config
   val s2graph = new Graph(config)(ec)
@@ -185,6 +220,8 @@ object NettyServer {
 
   val deployInfo = Try(Source.fromFile("./release_info").mkString("")).recover { case _ => "release info not found\n" }.get
   var isHealthy = config.getBooleanWithFallback("app.health.on", true)
+  var isFallbackHealthy = true
+  var isQueryFallbackHealthy = true
 
   logger.info(s"starts with num of thread: $numOfThread, ${threadPool.getClass.getSimpleName}")
   logger.info(s"transport: $transport")
@@ -201,14 +238,13 @@ object NettyServer {
     try {
       val b: ServerBootstrap = new ServerBootstrap()
         .option(ChannelOption.SO_BACKLOG, Int.box(2048))
-
       b.group(bossGroup, workerGroup).channel(channelClass)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new ChannelInitializer[SocketChannel] {
           override def initChannel(ch: SocketChannel) {
             val p = ch.pipeline()
             p.addLast(new HttpServerCodec())
-            p.addLast(new HttpObjectAggregator(65536))
+            p.addLast(new HttpObjectAggregator(maxBodySize))
             p.addLast(new S2RestHandler(rest)(ec))
           }
         })

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
index 30a5ee4..692ab1e 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
@@ -52,7 +52,7 @@ object Global extends WithFilters(new GzipFilter()) {
     // init s2graph with config
     s2graph = new Graph(config)(ec)
     storageManagement = new Management(s2graph)
-    s2parser = new RequestParser(s2graph.config) // merged config
+    s2parser = new RequestParser(s2graph) 
     s2rest = new RestHandler(s2graph)(ec)
 
     logger.info(s"starts with num of thread: $numOfThread, ${threadPool.getClass.getSimpleName}")
@@ -83,7 +83,7 @@ object Global extends WithFilters(new GzipFilter()) {
     wallLogHandler.shutdown()
     QueueActor.shutdown()
 
-    /*
+    /**
      * shutdown hbase client for flush buffers.
      */
     shutdown()

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala b/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala
index 3c488fe..3c49954 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala
@@ -43,8 +43,12 @@ object Config {
   lazy val KAFKA_METADATA_BROKER_LIST = conf.getString("kafka.metadata.broker.list").getOrElse("localhost")
 
   lazy val KAFKA_LOG_TOPIC = s"s2graphIn${PHASE}"
+  lazy val KAFKA_LOG_TOPIC_JSON = s"s2graphIn${PHASE}Json"
   lazy val KAFKA_LOG_TOPIC_ASYNC = s"s2graphIn${PHASE}Async"
+  lazy val KAFKA_LOG_TOPIC_ASYNC_JSON = s"s2graphIn${PHASE}AsyncJson"
   lazy val KAFKA_FAIL_TOPIC = s"s2graphIn${PHASE}Failed"
+  lazy val KAFKA_FAIL_TOPIC_JSON = s"s2graphIn${PHASE}FailedJson"
+  lazy val KAFKA_MUTATE_FAIL_TOPIC = s"mutateFailed_${PHASE}"
 
   // is query or write
   lazy val IS_QUERY_SERVER = conf.getBoolean("is.query.server").getOrElse(true)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
index 13639b9..b32c16a 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
@@ -22,10 +22,10 @@ package org.apache.s2graph.rest.play.controllers
 import akka.util.ByteString
 import org.apache.s2graph.core.GraphExceptions.BadQueryException
 import org.apache.s2graph.core.PostProcess
+import org.apache.s2graph.core.rest.RestHandler.CanLookup
 import org.apache.s2graph.core.utils.logger
 import org.apache.s2graph.rest.play.config.Config
 import play.api.http.HttpEntity
-import play.api.libs.iteratee.Enumerator
 import play.api.libs.json.{JsString, JsValue}
 import play.api.mvc._
 
@@ -42,6 +42,10 @@ object ApplicationController extends Controller {
 
   val jsonText: BodyParser[String] = s2parse.jsonText
 
+  implicit val oneTupleLookup = new CanLookup[Headers] {
+    override def lookup(m: Headers, key: String) = m.get(key)
+  }
+
   private def badQueryExceptionResults(ex: Exception) =
     Future.successful(BadRequest(PostProcess.badRequestResults(ex)).as(applicationJsonHeader))
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
index 53f3fce..b3ac89d 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
@@ -156,9 +156,11 @@ object CounterController extends Controller {
           useProfile = useProfile, bucketImpId, useRank = useRank, ttl, dailyTtl, Some(hbaseTable), intervalUnit,
           rateActionId, rateBaseId, rateThreshold)
 
-        // prepare exact storage
-        exactCounter(version).prepare(policy)
-        if (useRank) {
+        if (rateAction.isEmpty) {
+          // prepare exact storage
+          exactCounter(version).prepare(policy)
+        }
+        if (useRank || rateAction.isDefined) {
           // prepare ranking storage
           rankingCounter(version).prepare(policy)
         }
@@ -253,8 +255,11 @@ object CounterController extends Controller {
           // change table name
           val newTableName = Seq(tablePrefixMap(version), service, policy.ttl) ++ policy.dailyTtl mkString "_"
           val newPolicy = policy.copy(version = version, hbaseTable = Some(newTableName))
-          exactCounter(version).prepare(newPolicy)
-          if (newPolicy.useRank) {
+
+          if (newPolicy.rateActionId.isEmpty) {
+            exactCounter(version).prepare(newPolicy)
+          }
+          if (newPolicy.useRank || newPolicy.rateActionId.isDefined) {
             rankingCounter(version).prepare(newPolicy)
           }
           Ok(Json.toJson(Map("msg" -> s"prepare storage v$version $service/$action")))
@@ -272,8 +277,10 @@ object CounterController extends Controller {
         policy <- counterModel.findByServiceAction(service, action, useCache = false)
       } yield {
         Try {
-          exactCounter(policy.version).destroy(policy)
-          if (policy.useRank) {
+          if (policy.rateActionId.isEmpty) {
+            exactCounter(policy.version).destroy(policy)
+          }
+          if (policy.useRank || policy.rateActionId.isDefined) {
             rankingCounter(policy.version).destroy(policy)
           }
           counterModel.deleteServiceAction(policy)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
index da88c3d..8000cf8 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
@@ -19,6 +19,7 @@
 
 package org.apache.s2graph.rest.play.controllers
 
+import com.fasterxml.jackson.databind.JsonMappingException
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls.Label
 import org.apache.s2graph.core.rest.RequestParser
@@ -30,19 +31,19 @@ import play.api.mvc.{Controller, Result}
 
 import scala.collection.Seq
 import scala.concurrent.Future
-import scala.util.Random
 
 object EdgeController extends Controller {
 
   import ApplicationController._
+  import ExceptionHandler._
   import play.api.libs.concurrent.Execution.Implicits._
 
   private val s2: Graph = org.apache.s2graph.rest.play.Global.s2graph
   private val requestParser: RequestParser = org.apache.s2graph.rest.play.Global.s2parser
   private val walLogHandler: ExceptionHandler = org.apache.s2graph.rest.play.Global.wallLogHandler
 
-  private def enqueue(topic: String, elem: GraphElement, tsv: String) = {
-    val kafkaMessage = ExceptionHandler.toKafkaMessage(topic, elem, Option(tsv))
+  private def enqueue(topic: String, elem: GraphElement, tsv: String, publishJson: Boolean = false) = {
+    val kafkaMessage = ExceptionHandler.toKafkaMessage(topic, elem, Option(tsv), publishJson)
     walLogHandler.enqueue(kafkaMessage)
   }
 
@@ -50,63 +51,129 @@ object EdgeController extends Controller {
     val kafkaTopic = toKafkaTopic(graphElem.isAsync)
 
     graphElem match {
-      case v: Vertex => enqueue(kafkaTopic, graphElem, tsv)
+      case v: Vertex =>
+        enqueue(kafkaTopic, graphElem, tsv)
       case e: Edge =>
         e.label.extraOptions.get("walLog") match {
-          case None => enqueue(kafkaTopic, e, tsv)
+          case None =>
+            enqueue(kafkaTopic, e, tsv)
           case Some(walLogOpt) =>
-            (walLogOpt \ "method").as[JsValue] match {
+            (walLogOpt \ "method").get match {
               case JsString("drop") => // pass
               case JsString("sample") =>
                 val rate = (walLogOpt \ "rate").as[Double]
-                if (scala.util.Random.nextDouble() < rate) enqueue(kafkaTopic, e, tsv)
-              case _ => enqueue(kafkaTopic, e, tsv)
+                if (scala.util.Random.nextDouble() < rate) {
+                  enqueue(kafkaTopic, e, tsv)
+                }
+              case _ =>
+                enqueue(kafkaTopic, e, tsv)
             }
         }
+      case _ => logger.error(s"Unknown graph element type: ${graphElem}")
     }
   }
 
+  private def toDeleteAllFailMessages(srcVertices: Seq[Vertex], labels: Seq[Label], dir: Int, ts: Long ) = {
+    for {
+      vertex <- srcVertices
+      id = vertex.id.toString
+      label <- labels
+    } yield {
+      val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", GraphUtil.fromOp(dir.toByte)).mkString("\t")
+      ExceptionHandler.toKafkaMessage(Config.KAFKA_MUTATE_FAIL_TOPIC, tsv)
+    }
+  }
+
+  private def publishFailTopic(kafkaMessages: Seq[KafkaMessage]): Unit ={
+    kafkaMessages.foreach(walLogHandler.enqueue)
+  }
+
+  def mutateElementsWithFailLog(elements: Seq[(GraphElement, String)]) ={
+    val result = s2.mutateElements(elements.map(_._1), true)
+    result onComplete { results =>
+      results.get.zip(elements).map {
+        case (false, (e: Edge, tsv: String)) =>
+          val kafkaMessages = if(e.op == GraphUtil.operations("deleteAll")){
+            toDeleteAllFailMessages(Seq(e.srcVertex), Seq(e.label), e.labelWithDir.dir, e.ts)
+          } else{
+            Seq(ExceptionHandler.toKafkaMessage(Config.KAFKA_MUTATE_FAIL_TOPIC, e, Some(tsv)))
+          }
+          publishFailTopic(kafkaMessages)
+        case _ =>
+      }
+    }
+    result
+  }
+
   private def tryMutate(elementsWithTsv: Seq[(GraphElement, String)], withWait: Boolean): Future[Result] = {
     if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized)
     else {
-      try {
-        elementsWithTsv.foreach { case (graphElem, tsv) =>
-          publish(graphElem, tsv)
-        }
+      elementsWithTsv.foreach { case (graphElem, tsv) =>
+        publish(graphElem, tsv)
+      }
 
-        val elementsToStore = for {
-          (e, _tsv) <- elementsWithTsv if !skipElement(e.isAsync)
-        } yield e
-
-        if (elementsToStore.isEmpty) Future.successful(jsonResponse(JsArray()))
-        else {
-          if (withWait) {
-            val rets = s2.mutateElements(elementsToStore, withWait)
-            rets.map(Json.toJson(_)).map(jsonResponse(_))
-          } else {
-            val rets = elementsToStore.map { element => QueueActor.router ! element; true }
-            Future.successful(jsonResponse(Json.toJson(rets)))
+      if (elementsWithTsv.isEmpty) Future.successful(jsonResponse(JsArray()))
+      else {
+        val elementWithIdxs = elementsWithTsv.zipWithIndex
+        if (withWait) {
+          val (elementSync, elementAsync) = elementWithIdxs.partition { case ((element, tsv), idx) =>
+            !skipElement(element.isAsync)
+          }
+          val retToSkip = elementAsync.map(_._2 -> true)
+          val elementsToStore = elementSync.map(_._1)
+          val elementsIdxToStore = elementSync.map(_._2)
+          mutateElementsWithFailLog(elementsToStore).map { rets =>
+            elementsIdxToStore.zip(rets) ++ retToSkip
+          }.map { rets =>
+            Json.toJson(rets.sortBy(_._1).map(_._2))
+          }.map(jsonResponse(_))
+        } else {
+          val rets = elementWithIdxs.map { case ((element, tsv), idx) =>
+            if (!skipElement(element.isAsync)) QueueActor.router ! (element, tsv)
+            true
           }
+          Future.successful(jsonResponse(Json.toJson(rets)))
         }
-      } catch {
-        case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"$e"))
-        case e: Throwable =>
-          logger.error(s"tryMutate: ${e.getMessage}", e)
-          Future.successful(InternalServerError(s"${e.getStackTrace}"))
       }
     }
   }
 
   def mutateJsonFormat(jsValue: JsValue, operation: String, withWait: Boolean = false): Future[Result] = {
     logger.debug(s"$jsValue")
-    val edgesWithTsv = requestParser.parseJsonFormat(jsValue, operation)
-    tryMutate(edgesWithTsv, withWait)
+
+    try {
+      val edgesWithTsv = requestParser.parseJsonFormat(jsValue, operation)
+      tryMutate(edgesWithTsv, withWait)
+    } catch {
+      case e: JsonMappingException =>
+        logger.malformed(jsValue, e)
+        Future.successful(BadRequest(s"${e.getMessage}"))
+      case e: GraphExceptions.JsonParseException  =>
+        logger.malformed(jsValue, e)
+        Future.successful(BadRequest(s"${e.getMessage}"))
+      case e: Exception =>
+        logger.malformed(jsValue, e)
+        Future.failed(e)
+    }
   }
 
   def mutateBulkFormat(str: String, withWait: Boolean = false): Future[Result] = {
     logger.debug(s"$str")
-    val elementsWithTsv = requestParser.parseBulkFormat(str)
-    tryMutate(elementsWithTsv, withWait)
+
+    try {
+      val elementsWithTsv = requestParser.parseBulkFormat(str)
+      tryMutate(elementsWithTsv, withWait)
+    } catch {
+      case e: JsonMappingException =>
+        logger.malformed(str, e)
+        Future.successful(BadRequest(s"${e.getMessage}"))
+      case e: GraphExceptions.JsonParseException  =>
+        logger.malformed(str, e)
+        Future.successful(BadRequest(s"${e.getMessage}"))
+      case e: Exception =>
+        logger.malformed(str, e)
+        Future.failed(e)
+    }
   }
 
   def mutateBulk() = withHeaderAsync(parse.text) { request =>
@@ -163,29 +230,34 @@ object EdgeController extends Controller {
 
     if (edges.isEmpty) Future.successful(jsonResponse(JsArray()))
     else {
+
       s2.incrementCounts(edges, withWait = true).map { results =>
         val json = results.map { case (isSuccess, resultCount) =>
           Json.obj("success" -> isSuccess, "result" -> resultCount)
         }
+
         jsonResponse(Json.toJson(json))
       }
     }
   }
 
   def deleteAll() = withHeaderAsync(jsonParser) { request =>
-//    deleteAllInner(request.body, withWait = false)
     deleteAllInner(request.body, withWait = true)
   }
 
+  def deleteAllWithOutWait() = withHeaderAsync(jsonParser) { request =>
+    deleteAllInner(request.body, withWait = false)
+  }
+
   def deleteAllInner(jsValue: JsValue, withWait: Boolean) = {
 
-    /* logging for delete all request */
+    /** logging for delete all request */
     def enqueueLogMessage(ids: Seq[JsValue], labels: Seq[Label], ts: Long, direction: String, topicOpt: Option[String]) = {
       val kafkaMessages = for {
         id <- ids
         label <- labels
       } yield {
-        val tsv = Seq(ts, "deleteAll", "e", requestParser.jsToStr(id), requestParser.jsToStr(id), label.label, "{}", direction).mkString("\t")
+        val tsv = Seq(ts, "deleteAll", "e", RequestParser.jsToStr(id), RequestParser.jsToStr(id), label.label, "{}", direction).mkString("\t")
         val topic = topicOpt.getOrElse { toKafkaTopic(label.isAsync) }
 
         ExceptionHandler.toKafkaMessage(topic, tsv)
@@ -194,10 +266,18 @@ object EdgeController extends Controller {
       kafkaMessages.foreach(walLogHandler.enqueue)
     }
 
-    def deleteEach(labels: Seq[Label], direction: String, ids: Seq[JsValue], ts: Long, vertices: Seq[Vertex]) = {
-      enqueueLogMessage(ids, labels, ts, direction, None)
+    def deleteEach(labels: Seq[Label], direction: String, ids: Seq[JsValue],
+                   ts: Long, vertices: Seq[Vertex]) = {
+
       val future = s2.deleteAllAdjacentEdges(vertices.toList, labels, GraphUtil.directions(direction), ts)
       if (withWait) {
+        future onComplete {
+          case ret =>
+            if (!ret.get) {
+              val messages = toDeleteAllFailMessages(vertices.toList, labels, GraphUtil.directions(direction), ts)
+              publishFailTopic(messages)
+            }
+        }
         future
       } else {
         Future.successful(true)
@@ -205,9 +285,13 @@ object EdgeController extends Controller {
     }
 
     val deleteFutures = jsValue.as[Seq[JsValue]].map { json =>
-      val (labels, direction, ids, ts, vertices) = requestParser.toDeleteParam(json)
+      val (_labels, direction, ids, ts, vertices) = requestParser.toDeleteParam(json)
+      val srcVertices = vertices
+      enqueueLogMessage(ids, _labels, ts, direction, None)
+      val labels = _labels.filterNot(e => skipElement(e.isAsync))
+
       if (labels.isEmpty || ids.isEmpty) Future.successful(true)
-      else deleteEach(labels, direction, ids, ts, vertices)
+      else deleteEach(labels, direction, ids, ts, srcVertices)
     }
 
     val deleteResults = Future.sequence(deleteFutures)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala
index 9c4a061..760211a 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala
@@ -19,7 +19,6 @@
 
 package org.apache.s2graph.rest.play.controllers
 
-import org.apache.s2graph.core.mysqls.Experiment
 import org.apache.s2graph.core.rest.RestHandler
 import play.api.mvc._
 
@@ -30,10 +29,10 @@ object ExperimentController extends Controller {
 
   import ApplicationController._
 
+  def experiments() = experiment("", "", "")
   def experiment(accessToken: String, experimentName: String, uuid: String) = withHeaderAsync(jsonText) { request =>
     val body = request.body
-
-    val res = rest.doPost(request.uri, body, request.headers.get(Experiment.impressionKey))
+    val res = rest.doPost(request.uri, body, request.headers)
     res.body.map { case js =>
       val headers = res.headers :+ ("result_size" -> rest.calcSize(js).toString)
       jsonResponse(js, headers: _*)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
index 0260b7a..6a7d4f7 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
@@ -63,12 +63,4 @@ object PublishController extends Controller {
 
   def publish(topic: String) = publishOnly(topic)
 
-  //  def mutateBulk(topic: String) = Action.async(parse.text) { request =>
-  //    EdgeController.mutateAndPublish(Config.KAFKA_LOG_TOPIC, Config.KAFKA_FAIL_TOPIC, request.body).map { result =>
-  //      result.withHeaders(CONNECTION -> "Keep-Alive", "Keep-Alive" -> "timeout=10, max=10")
-  //    }
-  //  }
-  def mutateBulk(topic: String) = withHeaderAsync(parse.text) { request =>
-    EdgeController.mutateBulkFormat(request.body)
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala
index 495cf7b..1e49ca7 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala
@@ -19,8 +19,6 @@
 
 package org.apache.s2graph.rest.play.controllers
 
-import org.apache.s2graph.core.JSONParser
-import org.apache.s2graph.core.mysqls.Experiment
 import org.apache.s2graph.core.rest.RestHandler
 import play.api.libs.json.Json
 import play.api.mvc._
@@ -31,13 +29,11 @@ object QueryController extends Controller {
 
   import ApplicationController._
   import play.api.libs.concurrent.Execution.Implicits.defaultContext
-
   private val rest: RestHandler = org.apache.s2graph.rest.play.Global.s2rest
 
   def delegate(request: Request[String]) = {
-    rest.doPost(request.uri, request.body, request.headers.get(Experiment.impressionKey)).body.map {
-      js =>
-        jsonResponse(js, "result_size" -> rest.calcSize(js).toString)
+    rest.doPost(request.uri, request.body, request.headers).body.map { js =>
+      jsonResponse(js, "result_size" -> rest.calcSize(js).toString)
     } recoverWith ApplicationController.requestFallback(request.body)
   }
 
@@ -58,13 +54,12 @@ object QueryController extends Controller {
   def getEdgesGroupedExcludedFormatted() = withHeaderAsync(jsonText)(delegate)
 
   def getEdge(srcId: String, tgtId: String, labelName: String, direction: String) =
-    withHeaderAsync(jsonText) {
-      request =>
-        val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId))
-        rest.checkEdges(params).body.map {
-          js =>
-            jsonResponse(js, "result_size" -> rest.calcSize(js).toString)
-        } recoverWith ApplicationController.requestFallback(request.body)
+    withHeaderAsync(jsonText) { request =>
+      val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId))
+      rest.checkEdges(params).body.map {
+        js =>
+          jsonResponse(js, "result_size" -> rest.calcSize(js).toString)
+      } recoverWith ApplicationController.requestFallback(request.body)
     }
 
   def getVertices() = withHeaderAsync(jsonText)(delegate)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
index 0fdbe43..72e6e82 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
@@ -50,14 +50,16 @@ object VertexController extends Controller {
         }
 
         //FIXME:
-        val verticesToStore = vertices.filterNot(v => v.isAsync)
-
-        if (withWait) {
-          val rets = s2.mutateVertices(verticesToStore, withWait = true)
-          rets.map(Json.toJson(_)).map(jsonResponse(_))
-        } else {
-          val rets = verticesToStore.map { vertex => QueueActor.router ! vertex; true }
-          Future.successful(jsonResponse(Json.toJson(rets)))
+        val verticesToStore = vertices.filterNot(v => skipElement(v.isAsync))
+        if (verticesToStore.isEmpty) Future.successful(jsonResponse(Json.toJson(Seq.empty[Boolean])))
+        else {
+          if (withWait) {
+            val rets = s2.mutateVertices(verticesToStore, withWait = true)
+            rets.map(Json.toJson(_)).map(jsonResponse(_))
+          } else {
+            val rets = verticesToStore.map { vertex => QueueActor.router ! vertex; true }
+            Future.successful(jsonResponse(Json.toJson(rets)))
+          }
         }
       } catch {
         case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"e"))

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/conf/reference.conf
----------------------------------------------------------------------
diff --git a/s2rest_play/conf/reference.conf b/s2rest_play/conf/reference.conf
index bda503c..c3b716b 100644
--- a/s2rest_play/conf/reference.conf
+++ b/s2rest_play/conf/reference.conf
@@ -125,6 +125,7 @@ local.queue.actor.rate.limit=1000000
 # local retry number
 max.retry.number=100
 max.back.off=50
+back.off.timeout=1000
 delete.all.fetch.size=10000
 hbase.fail.prob=-1.0
 lock.expire.time=600000

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/conf/routes
----------------------------------------------------------------------
diff --git a/s2rest_play/conf/routes b/s2rest_play/conf/routes
index ee49c69..c360e6f 100644
--- a/s2rest_play/conf/routes
+++ b/s2rest_play/conf/routes
@@ -23,7 +23,7 @@
 
 
 # publish
-POST        /publish/:topic                                               org.apache.s2graph.rest.play.controllers.PublishController.mutateBulk(topic)
+#POST        /publish/:topic                                               org.apache.s2graph.rest.play.controllers.PublishController.mutateBulk(topic)
 POST        /publishOnly/:topic                                           org.apache.s2graph.rest.play.controllers.PublishController.publishOnly(topic)
 
 #### Health Check
@@ -37,6 +37,7 @@ POST        /graphs/edges/insertBulk                                      org.ap
 POST        /graphs/edges/delete                                          org.apache.s2graph.rest.play.controllers.EdgeController.deletes()
 POST        /graphs/edges/deleteWithWait                                  org.apache.s2graph.rest.play.controllers.EdgeController.deletesWithWait()
 POST        /graphs/edges/deleteAll                                       org.apache.s2graph.rest.play.controllers.EdgeController.deleteAll()
+POST        /graphs/edges/deleteAllWithOutWait                            org.apache.s2graph.rest.play.controllers.EdgeController.deleteAllWithOutWait()
 POST        /graphs/edges/update                                          org.apache.s2graph.rest.play.controllers.EdgeController.updates()
 POST        /graphs/edges/updateWithWait                                  org.apache.s2graph.rest.play.controllers.EdgeController.updatesWithWait()
 POST        /graphs/edges/increment                                       org.apache.s2graph.rest.play.controllers.EdgeController.increments()
@@ -81,7 +82,7 @@ GET         /graphs/getLabels/:serviceName                                org.ap
 POST        /graphs/createLabel                                           org.apache.s2graph.rest.play.controllers.AdminController.createLabel()
 POST        /graphs/addIndex                                              org.apache.s2graph.rest.play.controllers.AdminController.addIndex()
 GET         /graphs/getLabel/:labelName                                   org.apache.s2graph.rest.play.controllers.AdminController.getLabel(labelName)
-PUT         /graphs/deleteLabel/:labelName                                org.apache.s2graph.rest.play.controllers.AdminController.deleteLabel(labelName)
+PUT         /graphs/deleteLabelReally/:labelName                          org.apache.s2graph.rest.play.controllers.AdminController.deleteLabel(labelName)
 
 POST        /graphs/addProp/:labelName                                    org.apache.s2graph.rest.play.controllers.AdminController.addProp(labelName)
 POST        /graphs/createServiceColumn                                   org.apache.s2graph.rest.play.controllers.AdminController.createServiceColumn()
@@ -117,7 +118,7 @@ POST    /counter/v1/mget                                                org.apac
 
 # Experiment API
 POST        /graphs/experiment/:accessToken/:experimentName/:uuid         org.apache.s2graph.rest.play.controllers.ExperimentController.experiment(accessToken, experimentName, uuid)
-
+POST        /graphs/experiments                                           org.apache.s2graph.rest.play.controllers.ExperimentController.experiments()
 
 # Map static resources from the /public folder to the /assets URL path
 GET         /images/*file                                                 controllers.Assets.at(path="/public/images", file)


[2/3] incubator-s2graph git commit: [S2GRAPH-121]: Create `Result` class to hold traverse result edges.

Posted by st...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
index ede1127..0377bd8 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
@@ -19,10 +19,53 @@
 
 package org.apache.s2graph.core
 
+import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core.mysqls.{ColumnMeta, Service, ServiceColumn}
 import org.apache.s2graph.core.types.{InnerVal, InnerValLike, SourceVertexId, VertexId}
 import play.api.libs.json.Json
-
+//
+//object S2Vertex {
+//  def apply(graph: Graph, vertex: Vertex): S2Vertex = {
+//    S2Vertex(graph,
+//      vertex.serviceName,
+//      vertex.serviceColumn.columnName,
+//      vertex.innerIdVal,
+//      vertex.serviceColumn.innerValsToProps(vertex.props),
+//      vertex.ts,
+//      GraphUtil.fromOp(vertex.op)
+//    )
+//  }
+//}
+//
+//case class S2Vertex(graph: Graph,
+//                    serviceName: String,
+//                    columnName: String,
+//                    id: Any,
+//                    props: Map[String, Any] = Map.empty,
+//                    ts: Long = System.currentTimeMillis(),
+//                    operation: String = "insert") extends GraphElement {
+//  lazy val vertex = {
+//    val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found."))
+//    val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found."))
+//    val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
+//
+//    val srcVertexId = VertexId(column.id.get, toInnerVal(id.toString, column.columnType, column.schemaVersion))
+//    val propsInner = column.propsToInnerVals(props) ++
+//      Map(ColumnMeta.timeStampSeq.toInt -> InnerVal.withLong(ts, column.schemaVersion))
+//
+//    Vertex(srcVertexId, ts, propsInner, op)
+//  }
+//
+//  val uniqueId = (serviceName, columnName, id)
+//
+//  override def isAsync: Boolean = vertex.isAsync
+//
+//  override def toLogString(): String = vertex.toLogString()
+//
+//  override def queueKey: String = vertex.queueKey
+//
+//  override def queuePartitionKey: String = vertex.queuePartitionKey
+//}
 case class Vertex(id: VertexId,
                   ts: Long = System.currentTimeMillis(),
                   props: Map[Int, InnerValLike] = Map.empty[Int, InnerValLike],
@@ -31,10 +74,19 @@ case class Vertex(id: VertexId,
 
   val innerId = id.innerId
 
+  val innerIdVal = innerId.value
+
+  lazy val properties = for {
+    (k, v) <- props
+    meta <- serviceColumn.metasMap.get(k)
+  } yield meta.name -> v.value
+
   def schemaVer = serviceColumn.schemaVersion
 
   def serviceColumn = ServiceColumn.findById(id.colId)
 
+  def columnName = serviceColumn.columnName
+
   def service = Service.findById(serviceColumn.serviceId)
 
   lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, service.hTableName)
@@ -114,7 +166,21 @@ object Vertex {
 
   def isLabelId(propKey: Int): Boolean = propKey > Byte.MaxValue
 
-  //  val emptyVertex = Vertex(new CompositeId(CompositeId.defaultColId, CompositeId.defaultInnerId, false, true),
-  //    System.currentTimeMillis())
-  def fromString(s: String): Option[Vertex] = Graph.toVertex(s)
+  def toVertex(serviceName: String,
+            columnName: String,
+            id: Any,
+            props: Map[String, Any] = Map.empty,
+            ts: Long = System.currentTimeMillis(),
+            operation: String = "insert"): Vertex = {
+
+    val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found."))
+    val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found."))
+    val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
+
+    val srcVertexId = VertexId(column.id.get, toInnerVal(id.toString, column.columnType, column.schemaVersion))
+    val propsInner = column.propsToInnerVals(props) ++
+      Map(ColumnMeta.timeStampSeq.toInt -> InnerVal.withLong(ts, column.schemaVersion))
+
+    new Vertex(srcVertexId, ts, propsInner, op)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
index 9bd172d..cd825f4 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
@@ -25,7 +25,8 @@ import scalikejdbc._
 import scala.util.Random
 
 object Experiment extends Model[Experiment] {
-  val impressionKey = "S2-Impression-Id"
+  val ImpressionKey = "S2-Impression-Id"
+  val ImpressionId = "Impression-Id"
 
   def apply(rs: WrappedResultSet): Experiment = {
     Experiment(rs.intOpt("id"),

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
index 2734211..f7318f6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
@@ -19,13 +19,15 @@
 
 package org.apache.s2graph.core.mysqls
 
+import java.util.Calendar
 
 import org.apache.s2graph.core.GraphExceptions.ModelNotFoundException
 import org.apache.s2graph.core.GraphUtil
+import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
+import org.apache.s2graph.core.types.{InnerValLike, InnerValLikeWithTs}
 import org.apache.s2graph.core.utils.logger
-import org.apache.s2graph.core.JSONParser._
-import play.api.libs.json._
+import play.api.libs.json.{JsObject, JsValue, Json}
 import scalikejdbc._
 
 object Label extends Model[Label] {
@@ -48,6 +50,7 @@ object Label extends Model[Label] {
     Label.delete(id.get)
   }
 
+
   def findByName(labelName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Label] = {
     val cacheKey = "label=" + labelName
     lazy val labelOpt =
@@ -292,11 +295,16 @@ case class Label(id: Option[Int], label: String,
   lazy val srcColumn = ServiceColumn.find(srcServiceId, srcColumnName).getOrElse(throw ModelNotFoundException("Source column not found"))
   lazy val tgtColumn = ServiceColumn.find(tgtServiceId, tgtColumnName).getOrElse(throw ModelNotFoundException("Target column not found"))
 
-  lazy val direction = if (isDirected) "out" else "undirected"
   lazy val defaultIndex = LabelIndex.findByLabelIdAndSeq(id.get, LabelIndex.DefaultSeq)
 
   //TODO: Make sure this is correct
+
+//  lazy val metas = metas(useCache = true)
   lazy val indices = LabelIndex.findByLabelIdAll(id.get, useCache = true)
+  lazy val labelMetas = LabelMeta.findAllByLabelId(id.get, useCache = true)
+  lazy val labelMetaSet = labelMetas.toSet
+  lazy val labelMetaMap = (labelMetas ++ LabelMeta.reservedMetas).map(m => m.seq -> m).toMap
+
   lazy val indicesMap = indices.map(idx => (idx.seq, idx)) toMap
   lazy val indexSeqsMap = indices.map(idx => (idx.metaSeqs, idx)) toMap
   lazy val indexNameMap = indices.map(idx => (idx.name, idx)) toMap
@@ -327,14 +335,37 @@ case class Label(id: Option[Int], label: String,
     jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType)
   } yield prop.name -> jsValue).toMap
 
+  lazy val metaPropsDefaultMapInnerString = (for {
+    prop <- metaPropsInner if LabelMeta.isValidSeq(prop.seq)
+    innerVal = InnerValLikeWithTs(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), System.currentTimeMillis())
+  } yield prop.name -> innerVal).toMap
+
   lazy val metaPropsDefaultMapInner = (for {
     prop <- metaPropsInner if LabelMeta.isValidSeq(prop.seq)
+    innerVal = InnerValLikeWithTs(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), System.currentTimeMillis())
+  } yield prop -> innerVal).toMap
+  lazy val metaPropsDefaultMapInnerSeq = metaPropsDefaultMapInner.toSeq
+  lazy val metaPropsJsValueWithDefault = (for {
+    prop <- metaProps if LabelMeta.isValidSeq(prop.seq)
     jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType)
-  } yield prop.name -> jsValue).toMap
+  } yield prop -> jsValue).toMap
+//  lazy val extraOptions = Model.extraOptions(Option("""{
+//    "storage": {
+//      "s2graph.storage.backend": "rocks",
+//      "rocks.db.path": "/tmp/db"
+//    }
+//  }"""))
 
   lazy val extraOptions: Map[String, JsValue] = options match {
     case None => Map.empty
-    case Some(v) => Json.parse(v).asOpt[JsObject].map { obj => obj.fields.toMap }.getOrElse(Map.empty)
+    case Some(v) =>
+      try {
+        Json.parse(v).asOpt[JsObject].map { obj => obj.fields.toMap }.getOrElse(Map.empty)
+      } catch {
+        case e: Exception =>
+          logger.error(s"An error occurs while parsing the extra label option: ${label}", e)
+          Map.empty
+      }
   }
 
   def srcColumnWithDir(dir: Int) = {
@@ -386,5 +417,23 @@ case class Label(id: Option[Int], label: String,
   )
 
 
+  def propsToInnerValsWithTs(props: Map[String, Any],
+                             ts: Long = System.currentTimeMillis()): Map[Byte, InnerValLikeWithTs] = {
+    for {
+      (k, v) <- props
+      labelMeta <- metaPropsInvMap.get(k)
+      innerVal = toInnerVal(v.toString, labelMeta.dataType, schemaVersion)
+    } yield labelMeta.seq -> InnerValLikeWithTs(innerVal, ts)
+
+  }
+
+  def innerValsWithTsToProps(props: Map[Byte, InnerValLikeWithTs]): Map[String, Any] = {
+    for {
+      (k, v) <- props
+      labelMeta <- metaPropsMap.get(k)
+    } yield {
+      labelMeta.name -> v.innerVal.value
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
index b68fa79..6fceabc 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
@@ -24,6 +24,8 @@ package org.apache.s2graph.core.mysqls
  */
 
 import org.apache.s2graph.core.JSONParser
+import org.apache.s2graph.core.JSONParser._
+import org.apache.s2graph.core.types.{InnerValLikeWithTs, InnerValLike}
 import play.api.libs.json.Json
 import scalikejdbc._
 object ServiceColumn extends Model[ServiceColumn] {
@@ -89,13 +91,40 @@ object ServiceColumn extends Model[ServiceColumn] {
     })
   }
 }
-case class ServiceColumn(id: Option[Int], serviceId: Int, columnName: String, columnType: String, schemaVersion: String) {
+case class ServiceColumn(id: Option[Int], serviceId: Int, columnName: String, columnType: String, schemaVersion: String)  {
 
   lazy val service = Service.findById(serviceId)
   lazy val metas = ColumnMeta.findAllByColumn(id.get)
+  lazy val metasMap = metas.map { meta => meta.seq.toInt -> meta } toMap
   lazy val metasInvMap = metas.map { meta => meta.name -> meta} toMap
   lazy val metaNamesMap = (ColumnMeta.lastModifiedAtColumn :: metas).map(x => (x.seq.toInt, x.name)) toMap
   lazy val toJson = Json.obj("serviceName" -> service.serviceName, "columnName" -> columnName, "columnType" -> columnType)
 
+  def propsToInnerVals(props: Map[String, Any]): Map[Int, InnerValLike] = {
+    for {
+      (k, v) <- props
+      labelMeta <- metasInvMap.get(k)
+      innerVal = toInnerVal(v.toString, labelMeta.dataType, schemaVersion)
+    } yield labelMeta.seq.toInt -> innerVal
+  }
+
+  def innerValsToProps(props: Map[Int, InnerValLike]): Map[String, Any] = {
+    for {
+      (k, v) <- props
+      columnMeta <- metasMap.get(k)
+    } yield {
+      columnMeta.name -> v.value
+    }
+  }
+
+  def innerValsWithTsToProps(props: Map[Int, InnerValLikeWithTs]): Map[String, Any] = {
+    for {
+      (k, v) <- props
+      columnMeta <- metasMap.get(k)
+    } yield {
+      columnMeta.name -> v.innerVal.value
+    }
+  }
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
index 5920f3c..1c93667 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
@@ -22,7 +22,7 @@ package org.apache.s2graph.core.parsers
 import org.apache.s2graph.core.GraphExceptions.WhereParserException
 import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
 import org.apache.s2graph.core.types.InnerValLike
-import org.apache.s2graph.core.{Edge, GraphExceptions, JSONParser}
+import org.apache.s2graph.core.Edge
 import org.apache.s2graph.core.JSONParser._
 import scala.annotation.tailrec
 import scala.util.Try

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index 52ee50d..d77ac7d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -36,9 +36,11 @@ import scala.util.{Failure, Success, Try}
 object TemplateHelper {
   val findVar = """\"?\$\{(.*?)\}\"?""".r
   val num = """(next_day|next_hour|next_week|now)?\s*(-?\s*[0-9]+)?\s*(hour|day|week)?""".r
+
   val hour = 60 * 60 * 1000L
   val day = hour * 24L
   val week = day * 7L
+
   def calculate(now: Long, n: Int, unit: String): Long = {
     val duration = unit match {
       case "hour" | "HOUR" => n * hour
@@ -72,12 +74,32 @@ object TemplateHelper {
   }
 }
 
-class RequestParser(config: Config) {
+object RequestParser {
+  type ExperimentParam = (JsObject, String, String, String, Option[String])
+  val defaultLimit = 100
+
+  def toJsValues(jsValue: JsValue): List[JsValue] = {
+    jsValue match {
+      case obj: JsObject => List(obj)
+      case arr: JsArray => arr.as[List[JsValue]]
+      case _ => List.empty[JsValue]
+    }
+  }
+
+  def jsToStr(js: JsValue): String = js match {
+    case JsString(s) => s
+    case _ => js.toString()
+  }
+
+}
+
+class RequestParser(graph: Graph) {
 
   import Management.JsonModel._
+  import RequestParser._
 
-  val hardLimit = 100000
-  val defaultLimit = 100
+  val config = graph.config
+  val hardLimit = config.getInt("query.hardlimit")
   val maxLimit = Int.MaxValue - 1
   val DefaultRpcTimeout = config.getInt("hbase.rpc.timeout")
   val DefaultMaxAttempt = config.getInt("hbase.client.retries.number")
@@ -106,13 +128,11 @@ class RequestParser(config: Config) {
         (labelOrderType.seq, value)
       }
     }
+
     ret
   }
 
-  def extractInterval(label: Label, _jsValue: JsValue) = {
-    val replaced = TemplateHelper.replaceVariable(System.currentTimeMillis(), _jsValue.toString())
-    val jsValue = Json.parse(replaced)
-
+  def extractInterval(label: Label, jsValue: JsValue) = {
     def extractKv(js: JsValue) = js match {
       case JsObject(map) => map.toSeq
       case JsArray(arr) => arr.flatMap {
@@ -135,15 +155,21 @@ class RequestParser(config: Config) {
     ret
   }
 
-  def extractDuration(label: Label, _jsValue: JsValue) = {
-    val replaced = TemplateHelper.replaceVariable(System.currentTimeMillis(), _jsValue.toString())
-    val jsValue = Json.parse(replaced)
-
+  def extractDuration(label: Label, jsValue: JsValue) = {
     for {
       js <- parseOption[JsObject](jsValue, "duration")
     } yield {
-      val minTs = parseOption[Long](js, "from").getOrElse(Long.MaxValue)
-      val maxTs = parseOption[Long](js, "to").getOrElse(Long.MinValue)
+      val minTs = (js \ "from").get match {
+        case JsString(s) => TemplateHelper.replaceVariable(System.currentTimeMillis(), s).toLong
+        case JsNumber(n) => n.toLong
+        case _ => Long.MinValue
+      }
+
+      val maxTs = (js \ "to").get match {
+        case JsString(s) => TemplateHelper.replaceVariable(System.currentTimeMillis(), s).toLong
+        case JsNumber(n) => n.toLong
+        case _ => Long.MaxValue
+      }
 
       if (minTs > maxTs) {
         throw new BadQueryException("Duration error. Timestamp of From cannot be larger than To.")
@@ -167,21 +193,24 @@ class RequestParser(config: Config) {
     }
     ret.map(_.toMap).getOrElse(Map.empty[Byte, InnerValLike])
   }
-  
+
   def extractWhere(label: Label, whereClauseOpt: Option[String]): Try[Where] = {
     whereClauseOpt match {
       case None => Success(WhereParser.success)
-      case Some(_where) =>
-        val where = TemplateHelper.replaceVariable(System.currentTimeMillis(), _where)
+      case Some(where) =>
         val whereParserKey = s"${label.label}_${where}"
+
         parserCache.get(whereParserKey, new Callable[Try[Where]] {
           override def call(): Try[Where] = {
-            WhereParser(label).parse(where) match {
+            val _where = TemplateHelper.replaceVariable(System.currentTimeMillis(), where)
+
+            WhereParser(label).parse(_where) match {
               case s@Success(_) => s
               case Failure(ex) => throw BadQueryException(ex.getMessage, ex)
             }
           }
         })
+
     }
   }
 
@@ -194,28 +223,38 @@ class RequestParser(config: Config) {
     } yield {
       Vertex(SourceVertexId(serviceColumn.id.get, innerId), System.currentTimeMillis())
     }
-    vertices.toSeq
+
+    vertices
   }
 
-  def toMultiQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): MultiQuery = {
+  def toMultiQuery(jsValue: JsValue, impIdOpt: Option[String]): MultiQuery = {
     val queries = for {
       queryJson <- (jsValue \ "queries").asOpt[Seq[JsValue]].getOrElse(Seq.empty)
     } yield {
-      toQuery(queryJson, isEdgeQuery)
+      toQuery(queryJson, impIdOpt = impIdOpt)
     }
     val weights = (jsValue \ "weights").asOpt[Seq[Double]].getOrElse(queries.map(_ => 1.0))
-    MultiQuery(queries = queries, weights = weights,
-      queryOption = toQueryOption(jsValue), jsonQuery = jsValue)
+    MultiQuery(queries = queries, weights = weights, queryOption = toQueryOption(jsValue, impIdOpt))
   }
 
-  def toQueryOption(jsValue: JsValue): QueryOption = {
+  def toQueryOption(jsValue: JsValue, impIdOpt: Option[String]): QueryOption = {
     val filterOutFields = (jsValue \ "filterOutFields").asOpt[List[String]].getOrElse(List(LabelMeta.to.name))
-    val filterOutQuery = (jsValue \ "filterOut").asOpt[JsValue].map { v => toQuery(v) }.map { q =>
+    val filterOutQuery = (jsValue \ "filterOut").asOpt[JsValue].map { v => toQuery(v, impIdOpt = impIdOpt) }.map { q =>
       q.copy(queryOption = q.queryOption.copy(filterOutFields = filterOutFields))
     }
     val removeCycle = (jsValue \ "removeCycle").asOpt[Boolean].getOrElse(true)
     val selectColumns = (jsValue \ "select").asOpt[List[String]].getOrElse(List.empty)
-    val groupByColumns = (jsValue \ "groupBy").asOpt[List[String]].getOrElse(List.empty)
+//    val groupByColumns = (jsValue \ "groupBy").asOpt[List[String]].getOrElse(List.empty)
+    val groupBy = (jsValue \ "groupBy").asOpt[JsValue].getOrElse(JsNull) match {
+      case obj: JsObject =>
+        val keys = (obj \ "key").asOpt[Seq[String]].getOrElse(Nil)
+        val groupByLimit = (obj \ "limit").asOpt[Int].getOrElse(hardLimit)
+        GroupBy(keys, groupByLimit)
+      case arr: JsArray =>
+        val keys = arr.asOpt[Seq[String]].getOrElse(Nil)
+        GroupBy(keys)
+      case _ => GroupBy.Empty
+    }
     val orderByColumns: List[(String, Boolean)] = (jsValue \ "orderBy").asOpt[List[JsObject]].map { jsLs =>
       for {
         js <- jsLs
@@ -238,7 +277,7 @@ class RequestParser(config: Config) {
 
     QueryOption(removeCycle = removeCycle,
       selectColumns = selectColumns,
-      groupByColumns = groupByColumns,
+      groupBy = groupBy,
       orderByColumns = orderByColumns,
       filterOutQuery = filterOutQuery,
       filterOutFields = filterOutFields,
@@ -247,10 +286,12 @@ class RequestParser(config: Config) {
       limitOpt = limitOpt,
       returnAgg = returnAgg,
       scoreThreshold = scoreThreshold,
-      returnDegree = returnDegree
+      returnDegree = returnDegree,
+      impIdOpt = impIdOpt
     )
   }
-  def toQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): Query = {
+
+  def toQuery(jsValue: JsValue, impIdOpt: Option[String]): Query = {
     try {
       val vertices =
         (for {
@@ -274,7 +315,7 @@ class RequestParser(config: Config) {
       if (vertices.isEmpty) throw BadQueryException("srcVertices`s id is empty")
       val steps = parse[Vector[JsValue]](jsValue, "steps")
 
-      val queryOption = toQueryOption(jsValue)
+      val queryOption = toQueryOption(jsValue, impIdOpt)
 
       val querySteps =
         steps.zipWithIndex.map { case (step, stepIdx) =>
@@ -332,7 +373,7 @@ class RequestParser(config: Config) {
 
         }
 
-      val ret = Query(vertices, querySteps, queryOption, jsValue)
+      val ret = Query(vertices, querySteps, queryOption)
       //      logger.debug(ret.toString)
       ret
     } catch {
@@ -354,10 +395,8 @@ class RequestParser(config: Config) {
       val limit = {
         parseOption[Int](labelGroup, "limit") match {
           case None => defaultLimit
-          case Some(l) if l < 0 => maxLimit
-          case Some(l) if l >= 0 =>
-            val default = hardLimit
-            Math.min(l, default)
+          case Some(l) if l < 0 => hardLimit
+          case Some(l) if l >= 0 => Math.min(l, hardLimit)
         }
       }
       val offset = parseOption[Int](labelGroup, "offset").getOrElse(0)
@@ -405,7 +444,6 @@ class RequestParser(config: Config) {
       // FIXME: Order of command matter
       QueryParam(labelWithDir)
         .sample(sample)
-        .limit(offset, limit)
         .rank(RankParam(label.id.get, scoring))
         .exclude(exclude)
         .include(include)
@@ -413,6 +451,7 @@ class RequestParser(config: Config) {
         .has(hasFilter)
         .labelOrderSeq(indexSeq)
         .interval(interval)
+        .limit(offset, limit)
         .where(where)
         .duplicatePolicy(duplicate)
         .includeDegree(includeDegree)
@@ -450,21 +489,8 @@ class RequestParser(config: Config) {
     }
   }
 
-  def toJsValues(jsValue: JsValue): List[JsValue] = {
-    jsValue match {
-      case obj: JsObject => List(obj)
-      case arr: JsArray => arr.as[List[JsValue]]
-      case _ => List.empty[JsValue]
-    }
-  }
-
-  def jsToStr(js: JsValue): String = js match {
-    case JsString(s) => s
-    case _ => js.toString()
-  }
-
   def parseBulkFormat(str: String): Seq[(GraphElement, String)] = {
-    val edgeStrs = str.split("\\n")
+    val edgeStrs = str.split("\\n").filterNot(_.isEmpty)
     val elementsWithTsv = for {
       edgeStr <- edgeStrs
       str <- GraphUtil.parseString(edgeStr)
@@ -480,24 +506,23 @@ class RequestParser(config: Config) {
   }
 
   private def toEdgeWithTsv(jsValue: JsValue, operation: String): Seq[(Edge, String)] = {
-    val srcId = (jsValue \ "from").asOpt[JsValue].map(jsToStr)
-    val tgtId = (jsValue \ "to").asOpt[JsValue].map(jsToStr)
-    val srcIds = (jsValue \ "froms").asOpt[List[JsValue]].toList.flatMap(froms => froms.map(jsToStr)) ++ srcId
-    val tgtIds = (jsValue \ "tos").asOpt[List[JsValue]].toList.flatMap(tos => tos.map(jsToStr)) ++ tgtId
+    val srcIds = (jsValue \ "from").asOpt[JsValue].map(Seq(_)).getOrElse(Nil) ++ (jsValue \ "froms").asOpt[Seq[JsValue]].getOrElse(Nil)
+    val tgtIds = (jsValue \ "to").asOpt[JsValue].map(Seq(_)).getOrElse(Nil) ++ (jsValue \ "tos").asOpt[Seq[JsValue]].getOrElse(Nil)
 
     val label = parse[String](jsValue, "label")
     val timestamp = parse[Long](jsValue, "timestamp")
-    val direction = parseOption[String](jsValue, "direction").getOrElse("")
-    val props = (jsValue \ "props").asOpt[JsValue].getOrElse("{}")
+    val direction = parseOption[String](jsValue, "direction").getOrElse("out")
+    val propsJson = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj())
 
     for {
-      srcId <- srcIds
-      tgtId <- tgtIds
+      srcId <- srcIds.flatMap(jsValueToAny(_).toSeq)
+      tgtId <- tgtIds.flatMap(jsValueToAny(_).toSeq)
     } yield {
-      val edge = Management.toEdge(timestamp, operation, srcId, tgtId, label, direction, props.toString)
+      //      val edge = Management.toEdge(graph, timestamp, operation, srcId, tgtId, label, direction, fromJsonToProperties(propsJson))
+      val edge = Edge.toEdge(srcId, tgtId, label, direction, fromJsonToProperties(propsJson), ts = timestamp, operation = operation)
       val tsv = (jsValue \ "direction").asOpt[String] match {
-        case None => Seq(timestamp, operation, "e", srcId, tgtId, label, props).mkString("\t")
-        case Some(dir) => Seq(timestamp, operation, "e", srcId, tgtId, label, props, dir).mkString("\t")
+        case None => Seq(timestamp, operation, "e", srcId, tgtId, label, propsJson.toString).mkString("\t")
+        case Some(dir) => Seq(timestamp, operation, "e", srcId, tgtId, label, propsJson.toString, dir).mkString("\t")
       }
 
       (edge, tsv)
@@ -513,8 +538,8 @@ class RequestParser(config: Config) {
     val ts = parseOption[Long](jsValue, "timestamp").getOrElse(System.currentTimeMillis())
     val sName = if (serviceName.isEmpty) parse[String](jsValue, "serviceName") else serviceName.get
     val cName = if (columnName.isEmpty) parse[String](jsValue, "columnName") else columnName.get
-    val props = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj())
-    Management.toVertex(ts, operation, id.toString, sName, cName, props.toString)
+    val props = fromJsonToProperties((jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj()))
+    Vertex.toVertex(sName, cName, id.toString, props, ts, operation)
   }
 
   def toPropElements(jsObj: JsValue) = Try {
@@ -637,7 +662,7 @@ class RequestParser(config: Config) {
 
   def toDeleteParam(json: JsValue) = {
     val labelName = (json \ "label").as[String]
-    val labels = Label.findByName(labelName).map { l => Seq(l) }.getOrElse(Nil).filterNot(_.isAsync)
+    val labels = Label.findByName(labelName).map { l => Seq(l) }.getOrElse(Nil)
     val direction = (json \ "direction").asOpt[String].getOrElse("out")
 
     val ids = (json \ "ids").asOpt[List[JsValue]].getOrElse(Nil)
@@ -645,4 +670,30 @@ class RequestParser(config: Config) {
     val vertices = toVertices(labelName, direction, ids)
     (labels, direction, ids, ts, vertices)
   }
+
+  def toFetchAndDeleteParam(json: JsValue) = {
+    val labelName = (json \ "label").as[String]
+    val fromOpt = (json \ "from").asOpt[JsValue]
+    val toOpt = (json \ "to").asOpt[JsValue]
+    val direction = (json \ "direction").asOpt[String].getOrElse("out")
+    val indexOpt = (json \ "index").asOpt[String]
+    val propsOpt = (json \ "props").asOpt[JsObject]
+    (labelName, fromOpt, toOpt, direction, indexOpt, propsOpt)
+  }
+
+  def parseExperiment(jsQuery: JsValue): Seq[ExperimentParam] = jsQuery.as[Seq[JsObject]].map { obj =>
+    def _require(field: String) = throw new RuntimeException(s"${field} not found")
+
+    val accessToken = (obj \ "accessToken").asOpt[String].getOrElse(_require("accessToken"))
+    val experimentName = (obj \ "experiment").asOpt[String].getOrElse(_require("experiment"))
+    val uuid = (obj \ "#uuid").get match {
+      case JsString(s) => s
+      case JsNumber(n) => n.toString
+      case _ => _require("#uuid")
+    }
+    val body = (obj \ "params").asOpt[JsObject].getOrElse(Json.obj())
+    val impKeyOpt = (obj \ Experiment.ImpressionKey).asOpt[String]
+
+    (body, accessToken, experimentName, uuid, impKeyOpt)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
index 55b3e79..4c77ad6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
@@ -21,7 +21,8 @@ package org.apache.s2graph.core.rest
 
 import java.net.URL
 
-import org.apache.s2graph.core.GraphExceptions.BadQueryException
+import org.apache.s2graph.core.GraphExceptions.{BadQueryException, LabelNotExistException}
+import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls.{Bucket, Experiment, Service}
 import org.apache.s2graph.core.utils.logger
@@ -29,8 +30,21 @@ import play.api.libs.json._
 
 import scala.concurrent.{ExecutionContext, Future}
 
-
 object RestHandler {
+  trait CanLookup[A] {
+    def lookup(m: A, key: String): Option[String]
+  }
+
+  object CanLookup {
+   implicit val oneTupleLookup = new CanLookup[(String, String)] {
+      override def lookup(m: (String, String), key: String) =
+        if (m._1 == key) Option(m._2) else None
+    }
+    implicit val hashMapLookup = new CanLookup[Map[String, String]] {
+      override def lookup(m: Map[String, String], key: String): Option[String] = m.get(key)
+    }
+  }
+
   case class HandlerResult(body: Future[JsValue], headers: (String, String)*)
 }
 
@@ -41,25 +55,31 @@ object RestHandler {
 class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
 
   import RestHandler._
-  val requestParser = new RequestParser(graph.config)
+  val requestParser = new RequestParser(graph)
+
 
   /**
     * Public APIS
     */
-  def doPost(uri: String, body: String, impKeyOpt: => Option[String] = None): HandlerResult = {
+  def doPost[A](uri: String, body: String, headers: A)(implicit ev: CanLookup[A]): HandlerResult = {
+    val impKeyOpt = ev.lookup(headers, Experiment.ImpressionKey)
+    val impIdOpt = ev.lookup(headers, Experiment.ImpressionId)
+
     try {
       val jsQuery = Json.parse(body)
 
       uri match {
-        case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.toSimpleVertexArrJson))
-        case "/graphs/getEdges/grouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithListFormatted))
-        case "/graphs/getEdgesExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.toSimpleVertexArrJson))
-        case "/graphs/getEdgesExcluded/grouped" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
+//        case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery, impIdOpt)(PostProcess.toSimpleVertexArrJson))
+        case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery, impIdOpt)(PostProcess.toJson))
+//        case "/graphs/getEdges/grouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithListFormatted))
+//        case "/graphs/getEdgesExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.toSimpleVertexArrJson))
+//        case "/graphs/getEdgesExcluded/grouped" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
         case "/graphs/checkEdges" => checkEdges(jsQuery)
-        case "/graphs/getEdgesGrouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithList))
-        case "/graphs/getEdgesGroupedExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExclude))
-        case "/graphs/getEdgesGroupedExcludedFormatted" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
+//        case "/graphs/getEdgesGrouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithList))
+//        case "/graphs/getEdgesGroupedExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExclude))
+//        case "/graphs/getEdgesGroupedExcludedFormatted" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
         case "/graphs/getVertices" => HandlerResult(getVertices(jsQuery))
+        case "/graphs/experiments" => experiments(jsQuery)
         case uri if uri.startsWith("/graphs/experiment") =>
           val Array(accessToken, experimentName, uuid) = uri.split("/").takeRight(3)
           experiment(jsQuery, accessToken, experimentName, uuid, impKeyOpt)
@@ -75,17 +95,8 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
     try {
       val (quads, isReverted) = requestParser.toCheckEdgeParam(jsValue)
 
-      HandlerResult(graph.checkEdges(quads).map { case queryRequestWithResultLs =>
-        val edgeJsons = for {
-          queryRequestWithResult <- queryRequestWithResultLs
-          (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
-          edgeWithScore <- queryResult.edgeWithScoreLs
-          (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
-          convertedEdge = if (isReverted) edge.duplicateEdge else edge
-          edgeJson = PostProcess.edgeToJson(convertedEdge, score, queryRequest.query, queryRequest.queryParam)
-        } yield Json.toJson(edgeJson)
-
-        Json.toJson(edgeJsons)
+      HandlerResult(graph.checkEdges(quads).map { case stepResult =>
+        PostProcess.toJson(graph, QueryOption(), stepResult)
       })
     } catch {
       case e: Exception => HandlerResult(Future.failed(e))
@@ -93,11 +104,23 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
   }
 
 
-  /**
-    * Private APIS
-    */
-  private def experiment(contentsBody: JsValue, accessToken: String, experimentName: String, uuid: String, impKeyOpt: => Option[String]): HandlerResult = {
+  private def experiments(jsQuery: JsValue): HandlerResult = {
+    val params: Seq[RequestParser.ExperimentParam] = requestParser.parseExperiment(jsQuery)
+
+    val results = params map { case (body, token, experimentName, uuid, impKeyOpt) =>
+      val handlerResult = experiment(body, token, experimentName, uuid, impKeyOpt)
+      val future = handlerResult.body.recover {
+        case e: Exception => PostProcess.emptyResults ++ Json.obj("error" -> Json.obj("reason" -> e.getMessage))
+      }
+
+      future
+    }
+
+    val result = Future.sequence(results).map(JsArray)
+    HandlerResult(body = result)
+  }
 
+  private def experiment(contentsBody: JsValue, accessToken: String, experimentName: String, uuid: String, impKeyOpt: => Option[String] = None): HandlerResult = {
     try {
       val bucketOpt = for {
         service <- Service.findByAccessToken(accessToken)
@@ -108,7 +131,7 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
       val bucket = bucketOpt.getOrElse(throw new RuntimeException("bucket is not found"))
       if (bucket.isGraphQuery) {
         val ret = buildRequestInner(contentsBody, bucket, uuid)
-        HandlerResult(ret.body, Experiment.impressionKey -> bucket.impressionId)
+        HandlerResult(ret.body, Experiment.ImpressionKey -> bucket.impressionId)
       }
       else throw new RuntimeException("not supported yet")
     } catch {
@@ -127,119 +150,54 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
       val experimentLog = s"POST $path took -1 ms 200 -1 $body"
       logger.debug(experimentLog)
 
-      doPost(path, body)
-    }
-  }
-
-  private def eachQuery(post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue)(q: Query): Future[JsValue] = {
-    val filterOutQueryResultsLs = q.filterOutQuery match {
-      case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
-      case None => Future.successful(Seq.empty)
-    }
-
-    for {
-      queryResultsLs <- graph.getEdges(q)
-      filterOutResultsLs <- filterOutQueryResultsLs
-    } yield {
-      val json = post(queryResultsLs, filterOutResultsLs)
-      json
+      doPost(path, body, Experiment.ImpressionId -> bucket.impressionId)
     }
   }
 
-  def getEdgesAsync(jsonQuery: JsValue)
-                   (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
-
-    val fetch = eachQuery(post) _
+  def getEdgesAsync(jsonQuery: JsValue, impIdOpt: Option[String] = None)
+                   (post: (Graph, QueryOption, StepResult) => JsValue): Future[JsValue] = {
     jsonQuery match {
-      case JsArray(arr) => Future.traverse(arr.map(requestParser.toQuery(_)))(fetch).map(JsArray)
       case obj@JsObject(_) =>
         (obj \ "queries").asOpt[JsValue] match {
-          case None => fetch(requestParser.toQuery(obj))
+          case None =>
+            val query = requestParser.toQuery(obj, impIdOpt)
+            graph.getEdges(query).map(post(graph, query.queryOption, _))
           case _ =>
-            val multiQuery = requestParser.toMultiQuery(obj)
-            val filterOutFuture = multiQuery.queryOption.filterOutQuery match {
-              case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
-              case None => Future.successful(Seq.empty)
-            }
-            val futures = multiQuery.queries.zip(multiQuery.weights).map { case (query, weight) =>
-              val filterOutQueryResultsLs = query.queryOption.filterOutQuery match {
-                case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
-                case None => Future.successful(Seq.empty)
-              }
-              for {
-                queryRequestWithResultLs <- graph.getEdges(query)
-                filterOutResultsLs <- filterOutQueryResultsLs
-              } yield {
-                val newQueryRequestWithResult = for {
-                  queryRequestWithResult <- queryRequestWithResultLs
-                  queryResult = queryRequestWithResult.queryResult
-                } yield {
-                  val newEdgesWithScores = for {
-                    edgeWithScore <- queryRequestWithResult.queryResult.edgeWithScoreLs
-                  } yield {
-                    edgeWithScore.copy(score = edgeWithScore.score * weight)
-                  }
-                  queryRequestWithResult.copy(queryResult = queryResult.copy(edgeWithScoreLs = newEdgesWithScores))
-                }
-                logger.debug(s"[Size]: ${newQueryRequestWithResult.map(_.queryResult.edgeWithScoreLs.size).sum}")
-                (newQueryRequestWithResult, filterOutResultsLs)
-              }
-            }
-            for {
-              filterOut <- filterOutFuture
-              resultWithExcludeLs <- Future.sequence(futures)
-            } yield {
-              PostProcess.toSimpleVertexArrJsonMulti(multiQuery.queryOption, resultWithExcludeLs, filterOut)
-              //              val initial = (ListBuffer.empty[QueryRequestWithResult], ListBuffer.empty[QueryRequestWithResult])
-              //              val (results, excludes) = resultWithExcludeLs.foldLeft(initial) { case ((prevResults, prevExcludes), (results, excludes)) =>
-              //                (prevResults ++= results, prevExcludes ++= excludes)
-              //              }
-              //              PostProcess.toSimpleVertexArrJson(multiQuery.queryOption, results, excludes ++ filterOut)
-            }
+            val multiQuery = requestParser.toMultiQuery(obj, impIdOpt)
+            graph.getEdgesMultiQuery(multiQuery).map(post(graph, multiQuery.queryOption, _))
         }
-      case _ => throw BadQueryException("Cannot support")
-    }
-  }
-
-  private def getEdgesExcludedAsync(jsonQuery: JsValue)
-                                   (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
-    val q = requestParser.toQuery(jsonQuery)
-    val filterOutQuery = Query(q.vertices, Vector(q.steps.last))
 
-    val fetchFuture = graph.getEdges(q)
-    val excludeFuture = graph.getEdges(filterOutQuery)
+      case JsArray(arr) =>
+        val queries = arr.map(requestParser.toQuery(_, impIdOpt))
+        val weights = queries.map(_ => 1.0)
+        val multiQuery = MultiQuery(queries, weights, QueryOption(), jsonQuery)
+        graph.getEdgesMultiQuery(multiQuery).map(post(graph, multiQuery.queryOption, _))
 
-    for {
-      queryResultLs <- fetchFuture
-      exclude <- excludeFuture
-    } yield {
-      post(queryResultLs, exclude)
+      case _ => throw BadQueryException("Cannot support")
     }
   }
 
   private def getVertices(jsValue: JsValue) = {
     val jsonQuery = jsValue
-    val ts = System.currentTimeMillis()
-    val props = "{}"
 
     val vertices = jsonQuery.as[List[JsValue]].flatMap { js =>
       val serviceName = (js \ "serviceName").as[String]
       val columnName = (js \ "columnName").as[String]
-      for (id <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])) yield {
-        Management.toVertex(ts, "insert", id.toString, serviceName, columnName, props)
+      for {
+        idJson <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])
+        id <- jsValueToAny(idJson)
+      } yield {
+        Vertex.toVertex(serviceName, columnName, id)
       }
     }
 
     graph.getVertices(vertices) map { vertices => PostProcess.verticesToJson(vertices) }
   }
 
+
   private def buildRequestBody(requestKeyJsonOpt: Option[JsValue], bucket: Bucket, uuid: String): String = {
     var body = bucket.requestBody.replace("#uuid", uuid)
 
-    //    // replace variable
-    //    body = TemplateHelper.replaceVariable(System.currentTimeMillis(), body)
-
-    // replace param
     for {
       requestKeyJson <- requestKeyJsonOpt
       jsObj <- requestKeyJson.asOpt[JsObject]

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index eaa25af..a6e81b4 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -19,12 +19,10 @@
 
 package org.apache.s2graph.core.storage
 
-import java.util.concurrent.{TimeUnit, Executors}
+import java.util.concurrent.{Executors, TimeUnit}
 
 import com.typesafe.config.Config
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.s2graph.core.ExceptionHandler.{KafkaMessage, Key, Val}
 import org.apache.s2graph.core.GraphExceptions.FetchTimeoutException
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
@@ -37,13 +35,15 @@ import org.apache.s2graph.core.utils.{Extensions, logger}
 import scala.annotation.tailrec
 import scala.collection.Seq
 import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{Promise, ExecutionContext, Future}
+import scala.concurrent.{ExecutionContext, Future, Promise}
 import scala.util.{Random, Try}
 
-abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
+abstract class Storage[R](val graph: Graph,
+                          val config: Config)(implicit ec: ExecutionContext) {
   import HBaseType._
 
   /** storage dependent configurations */
+  val DeleteAllFetchCount = config.getInt("delete.all.fetch.count")
   val MaxRetryNum = config.getInt("max.retry.number")
   val MaxBackOff = config.getInt("max.back.off")
   val BackoffTimeout = config.getInt("back.off.timeout")
@@ -57,11 +57,15 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
   /** retry scheduler */
   val scheduledThreadPool = Executors.newSingleThreadScheduledExecutor()
 
+
   /** handle mutate failed */
   val exceptionHandler = new ExceptionHandler(config)
-
   val failTopic = s"mutateFailed_${config.getString("phase")}"
 
+  /** fallback */
+  val fallback = Future.successful(StepResult.Empty)
+  val innerFallback = Future.successful(StepInnerResult.Empty)
+
   /**
    * Compatibility table
    * | label schema version | snapshot edge | index edge | vertex | note |
@@ -229,7 +233,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
    * @return
    */
   def fetches(queryRequestWithScoreLs: Seq[(QueryRequest, Double)],
-              prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[QueryRequestWithResult]]
+              prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepInnerResult]]
 
   /**
    * fetch Vertex for given request from storage.
@@ -324,7 +328,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
 
   def mutateEdges(edges: Seq[Edge], withWait: Boolean): Future[Seq[Boolean]] = {
     val (strongEdges, weakEdges) =
-      edges.partition(e => e.label.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk"))
+      (edges.partition(e => e.label.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk")))
 
     val weakEdgesFutures = weakEdges.groupBy { e => e.label.hbaseZkAddr }.map { case (zkQuorum, edges) =>
       val mutations = edges.flatMap { edge =>
@@ -449,7 +453,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
       future recoverWith {
         case FetchTimeoutException(retryEdge) =>
           logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}")
-          /* fetch failed. re-fetch should be done */
+          /** fetch failed. re-fetch should be done */
           fetchSnapshotEdge(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
             retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
           }
@@ -465,14 +469,14 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
           }
           logger.info(s"[Try: $tryNum], [Status: $status] partial fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}")
 
-          /* retry logic */
+          /** retry logic */
           val promise = Promise[Boolean]
           val backOff = exponentialBackOff(tryNum)
           scheduledThreadPool.schedule(new Runnable {
             override def run(): Unit = {
               val future = if (failedStatusCode == 0) {
                 // acquire Lock failed. other is mutating so this thead need to re-fetch snapshotEdge.
-                /* fetch failed. re-fetch should be done */
+                /** fetch failed. re-fetch should be done */
                 fetchSnapshotEdge(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
                   retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
                 }
@@ -505,7 +509,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
       case 0 =>
         fetchedSnapshotEdgeOpt match {
           case None =>
-          /*
+          /**
            * no one has never mutated this SN.
            * (squashedEdge, edgeMutate) = Edge.buildOperation(None, edges)
            * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = squashedEdge.ts + 1)
@@ -527,7 +531,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
           case Some(snapshotEdge) =>
             snapshotEdge.pendingEdgeOpt match {
               case None =>
-                /*
+                /**
                  * others finished commit on this SN. but there is no contention.
                  * (squashedEdge, edgeMutate) = Edge.buildOperation(snapshotEdgeOpt, edges)
                  * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = snapshotEdge.version + 1) ?
@@ -549,7 +553,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
               case Some(pendingEdge) =>
                 val isLockExpired = pendingEdge.lockTs.get + LockExpireDuration < System.currentTimeMillis()
                 if (isLockExpired) {
-                  /*
+                  /**
                    * if pendingEdge.ts == snapshotEdge.ts =>
                    *    (squashedEdge, edgeMutate) = Edge.buildOperation(None, Seq(pendingEdge))
                    * else =>
@@ -571,7 +575,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
 
                   commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
                 } else {
-                  /*
+                  /**
                    * others finished commit on this SN and there is currently contention.
                    * this can't be proceed so retry from re-fetch.
                    * throw EX
@@ -584,11 +588,11 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
         }
       case _ =>
 
-        /*
+        /**
          * statusCode > 0 which means self locked and there has been partial failure either on mutate, increment, releaseLock
          */
 
-        /*
+        /**
          * this succeed to lock this SN. keep doing on commit process.
          * if SN.isEmpty =>
          * no one never succed to commit on this SN.
@@ -828,90 +832,96 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
 
 
   /** Delete All */
-  protected def deleteAllFetchedEdgesAsyncOld(queryRequest: QueryRequest,
-                                              queryResult: QueryResult,
+  protected def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepInnerResult,
                                               requestTs: Long,
                                               retryNum: Int): Future[Boolean] = {
-    val queryParam = queryRequest.queryParam
-    val zkQuorum = queryParam.label.hbaseZkAddr
-    val futures = for {
-      edgeWithScore <- queryResult.edgeWithScoreLs
-      (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
-    } yield {
-        /* reverted direction */
-        val reversedIndexedEdgesMutations = edge.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
-          indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
-            buildIncrementsAsync(indexEdge, -1L)
-        }
-        val reversedSnapshotEdgeMutations = snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
-        val forwardIndexedEdgeMutations = edge.edgesWithIndex.flatMap { indexEdge =>
-          indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
-            buildIncrementsAsync(indexEdge, -1L)
+    if (stepInnerResult.isEmpty) Future.successful(true)
+    else {
+      val head = stepInnerResult.edgesWithScoreLs.head
+      val zkQuorum = head.edge.label.hbaseZkAddr
+      val futures = for {
+        edgeWithScore <- stepInnerResult.edgesWithScoreLs
+        (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+      } yield {
+          /** reverted direction */
+          val reversedIndexedEdgesMutations = edge.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
+            indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
+              buildIncrementsAsync(indexEdge, -1L)
+          }
+          val reversedSnapshotEdgeMutations = snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
+          val forwardIndexedEdgeMutations = edge.edgesWithIndex.flatMap { indexEdge =>
+            indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
+              buildIncrementsAsync(indexEdge, -1L)
+          }
+          val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
+          writeToStorage(zkQuorum, mutations, withWait = true)
         }
-        val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
-        writeToStorage(zkQuorum, mutations, withWait = true)
-      }
 
-    Future.sequence(futures).map { rets => rets.forall(identity) }
+      Future.sequence(futures).map { rets => rets.forall(identity) }
+    }
   }
 
-  protected def buildEdgesToDelete(queryRequestWithResultLs: QueryRequestWithResult, requestTs: Long): QueryResult = {
-    val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResultLs).get
-    val edgeWithScoreLs = queryResult.edgeWithScoreLs.filter { edgeWithScore =>
+  protected def buildEdgesToDelete(stepInnerResult: StepInnerResult, requestTs: Long): StepInnerResult = {
+    val filtered = stepInnerResult.edgesWithScoreLs.filter { edgeWithScore =>
       (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree
-    }.map { edgeWithScore =>
-      val label = queryRequest.queryParam.label
-      val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match {
-        case "strong" =>
-          val _newPropsWithTs = edgeWithScore.edge.propsWithTs ++
-            Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion))
-          (GraphUtil.operations("delete"), requestTs, _newPropsWithTs)
-        case _ =>
-          val oldEdge = edgeWithScore.edge
-          (oldEdge.op, oldEdge.version, oldEdge.propsWithTs)
-      }
+    }
+    if (filtered.isEmpty) StepInnerResult.Empty
+    else {
+      val head = filtered.head
+      val label = head.edge.label
+      val edgeWithScoreLs = filtered.map { edgeWithScore =>
+        val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match {
+          case "strong" =>
+            val _newPropsWithTs = edgeWithScore.edge.propsWithTs ++
+              Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion))
+            (GraphUtil.operations("delete"), requestTs, _newPropsWithTs)
+          case _ =>
+            val oldEdge = edgeWithScore.edge
+            (oldEdge.op, oldEdge.version, oldEdge.propsWithTs)
+        }
 
-      val copiedEdge =
-        edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs)
+        val copiedEdge =
+          edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs)
 
-      val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
-//      logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}")
-      edgeToDelete
+        val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
+        //      logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}")
+        edgeToDelete
+      }
+      //Degree edge?
+      StepInnerResult(edgeWithScoreLs, Nil, false)
     }
-
-    queryResult.copy(edgeWithScoreLs = edgeWithScoreLs)
   }
 
-  protected def deleteAllFetchedEdgesLs(queryRequestWithResultLs: Seq[QueryRequestWithResult], requestTs: Long): Future[(Boolean, Boolean)] = {
-    val queryResultLs = queryRequestWithResultLs.map(_.queryResult)
-    queryResultLs.foreach { queryResult =>
-      if (queryResult.isFailure) throw new RuntimeException("fetched result is fallback.")
+  protected def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepInnerResult],
+                                        requestTs: Long): Future[(Boolean, Boolean)] = {
+    stepInnerResultLs.foreach { stepInnerResult =>
+      if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.")
     }
     val futures = for {
-      queryRequestWithResult <- queryRequestWithResultLs
-      (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get
-      deleteQueryResult = buildEdgesToDelete(queryRequestWithResult, requestTs)
-      if deleteQueryResult.edgeWithScoreLs.nonEmpty
+      stepInnerResult <- stepInnerResultLs
+      deleteStepInnerResult = buildEdgesToDelete(stepInnerResult, requestTs)
+      if deleteStepInnerResult.edgesWithScoreLs.nonEmpty
     } yield {
-        val label = queryRequest.queryParam.label
+        val head = deleteStepInnerResult.edgesWithScoreLs.head
+        val label = head.edge.label
         label.schemaVersion match {
           case HBaseType.VERSION3 | HBaseType.VERSION4 =>
             if (label.consistencyLevel == "strong") {
-              /*
+              /**
                * read: snapshotEdge on queryResult = O(N)
                * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge))
                */
-              mutateEdges(deleteQueryResult.edgeWithScoreLs.map(_.edge), withWait = true).map(_.forall(identity))
+              mutateEdges(deleteStepInnerResult.edgesWithScoreLs.map(_.edge), withWait = true).map(_.forall(identity))
             } else {
-              deleteAllFetchedEdgesAsyncOld(queryRequest, deleteQueryResult, requestTs, MaxRetryNum)
+              deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum)
             }
           case _ =>
 
-            /*
+            /**
              * read: x
              * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices)
              */
-            deleteAllFetchedEdgesAsyncOld(queryRequest, deleteQueryResult, requestTs, MaxRetryNum)
+            deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum)
         }
       }
 
@@ -923,10 +933,10 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
     }
   }
 
-  protected def fetchAndDeleteAll(query: Query, requestTs: Long): Future[(Boolean, Boolean)] = {
+  protected def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = {
     val future = for {
-      queryRequestWithResultLs <- getEdges(query)
-      (allDeleted, ret) <- deleteAllFetchedEdgesLs(queryRequestWithResultLs, requestTs)
+      stepInnerResultLs <- Future.sequence(queries.map(getEdgesStepInner(_)))
+      (allDeleted, ret) <- deleteAllFetchedEdgesLs(stepInnerResultLs, requestTs)
     } yield {
 //        logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}")
         (allDeleted, ret)
@@ -960,19 +970,19 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
     }
 
     val requestTs = ts
-    val queryParams = for {
+    /** create query per label */
+    val queries = for {
       label <- labels
     } yield {
         val labelWithDir = LabelWithDirection(label.id.get, dir)
-        QueryParam(labelWithDir).limit(0, DeleteAllFetchSize).duplicatePolicy(Option(Query.DuplicatePolicy.Raw))
+        val queryParam = QueryParam(labelWithDir).limit(0, DeleteAllFetchSize).duplicatePolicy(Option(Query.DuplicatePolicy.Raw))
+        val step = Step(List(queryParam))
+        Query(srcVertices, Vector(step))
       }
 
-    val step = Step(queryParams.toList)
-    val q = Query(srcVertices, Vector(step))
-
     //    Extensions.retryOnSuccessWithBackoff(MaxRetryNum, Random.nextInt(MaxBackOff) + 1) {
-    val retryFuture = Extensions.retryOnSuccess(MaxRetryNum) {
-      fetchAndDeleteAll(q, requestTs)
+        val retryFuture = Extensions.retryOnSuccess(DeleteAllFetchCount) {
+      fetchAndDeleteAll(queries, requestTs)
     } { case (allDeleted, deleteSuccess) =>
       allDeleted && deleteSuccess
     }.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess }
@@ -1039,7 +1049,9 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
                                queryParam: QueryParam,
                                prevScore: Double = 1.0,
                                isInnerCall: Boolean,
-                               parentEdges: Seq[EdgeWithScore]): Seq[EdgeWithScore] = {
+                               parentEdges: Seq[EdgeWithScore],
+                               startOffset: Int = 0,
+                               len: Int = Int.MaxValue): Seq[EdgeWithScore] = {
     if (kvs.isEmpty) Seq.empty
     else {
       val first = kvs.head
@@ -1050,7 +1062,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
         else indexEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, None)
 
       for {
-        kv <- kvs
+        (kv, idx) <- kvs.zipWithIndex if idx >= startOffset && idx < startOffset + len
         edge <-
         if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryParam, None, isInnerCall, parentEdges)
         else toEdge(kv, queryParam, cacheElementOpt, parentEdges)
@@ -1071,19 +1083,6 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
 
   /** End Of Parse Logic */
 
-//  /** methods for consistency */
-//  protected def writeAsyncSimple(zkQuorum: String, elementRpcs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] = {
-//    if (elementRpcs.isEmpty) {
-//      Future.successful(true)
-//    } else {
-//      val futures = elementRpcs.map { rpc => writeToStorage(rpc, withWait) }
-//      Future.sequence(futures).map(_.forall(identity))
-//    }
-//  }
-
-
-  //  def futureCache[T] = Cache[Long, (Long, T)]
-
   protected def toRequestEdge(queryRequest: QueryRequest): Edge = {
     val srcVertex = queryRequest.vertex
     //    val tgtVertexOpt = queryRequest.tgtVertexOpt
@@ -1095,7 +1094,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
     val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir)
     val (srcInnerId, tgtInnerId) = tgtVertexIdOpt match {
       case Some(tgtVertexId) => // _to is given.
-        /* we use toSnapshotEdge so dont need to swap src, tgt */
+        /** we use toSnapshotEdge so dont need to swap src, tgt */
         val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion)
         val tgt = InnerVal.convertVersion(tgtVertexId, tgtColumn.columnType, label.schemaVersion)
         (src, tgt)
@@ -1135,27 +1134,26 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
     }
   }
 
-  protected def fetchStep(orgQuery: Query, queryRequestWithResultsLs: Seq[QueryRequestWithResult]): Future[Seq[QueryRequestWithResult]] = {
-    if (queryRequestWithResultsLs.isEmpty) Future.successful(Nil)
+  protected def fetchStep(orgQuery: Query,
+                          stepIdx: Int,
+                          stepInnerResult: StepInnerResult): Future[StepInnerResult] = {
+    if (stepInnerResult.isEmpty) Future.successful(StepInnerResult.Empty)
     else {
-      val queryRequest = queryRequestWithResultsLs.head.queryRequest
-      val q = orgQuery
-      val queryResultsLs = queryRequestWithResultsLs.map(_.queryResult)
+      val edgeWithScoreLs = stepInnerResult.edgesWithScoreLs
 
-      val stepIdx = queryRequest.stepIdx + 1
+      val q = orgQuery
 
       val prevStepOpt = if (stepIdx > 0) Option(q.steps(stepIdx - 1)) else None
       val prevStepThreshold = prevStepOpt.map(_.nextStepScoreThreshold).getOrElse(QueryParam.DefaultThreshold)
       val prevStepLimit = prevStepOpt.map(_.nextStepLimit).getOrElse(-1)
       val step = q.steps(stepIdx)
+
       val alreadyVisited =
         if (stepIdx == 0) Map.empty[(LabelWithDirection, Vertex), Boolean]
-        else Graph.alreadyVisitedVertices(queryResultsLs)
+        else Graph.alreadyVisitedVertices(stepInnerResult.edgesWithScoreLs)
 
-      val groupedBy = queryResultsLs.flatMap { queryResult =>
-        queryResult.edgeWithScoreLs.map { case edgeWithScore =>
-          edgeWithScore.edge.tgtVertex -> edgeWithScore
-        }
+      val groupedBy = edgeWithScoreLs.map { case edgeWithScore =>
+        edgeWithScore.edge.tgtVertex -> edgeWithScore
       }.groupBy { case (vertex, edgeWithScore) => vertex }
 
       val groupedByFiltered = for {
@@ -1178,39 +1176,48 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
         queryParam <- step.queryParams
       } yield (QueryRequest(q, stepIdx, vertex, queryParam), prevStepScore)
 
-      Graph.filterEdges(fetches(queryRequests, prevStepTgtVertexIdEdges), alreadyVisited)(ec)
+      val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges)
+      Graph.filterEdges(orgQuery, stepIdx, queryRequests.map(_._1), fetchedLs, orgQuery.steps(stepIdx).queryParams, alreadyVisited)(ec)
     }
   }
-
-  protected def fetchStepFuture(orgQuery: Query, queryRequestWithResultLsFuture: Future[Seq[QueryRequestWithResult]]): Future[Seq[QueryRequestWithResult]] = {
-    for {
-      queryRequestWithResultLs <- queryRequestWithResultLsFuture
-      ret <- fetchStep(orgQuery, queryRequestWithResultLs)
-    } yield ret
+  private def getEdgesStepInner(q: Query): Future[StepInnerResult] = {
+    Try {
+      if (q.steps.isEmpty) innerFallback
+      else {
+        // current stepIdx = -1
+        val startStepInnerResult = QueryResult.fromVertices(q)
+        q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) =>
+          for {
+            prevStepInnerResult <- prevStepInnerResultFuture
+            currentStepInnerResult <- fetchStep(q, stepIdx, prevStepInnerResult)
+          } yield currentStepInnerResult
+        }
+      }
+    } recover {
+      case e: Exception =>
+        logger.error(s"getEdgesAsync: $e", e)
+        innerFallback
+    } get
   }
-
-  def getEdges(q: Query): Future[Seq[QueryRequestWithResult]] = {
-    val fallback = {
-      val queryRequest = QueryRequest(query = q, stepIdx = 0, q.vertices.head, queryParam = QueryParam.Empty)
-      Future.successful(q.vertices.map(v => QueryRequestWithResult(queryRequest, QueryResult())))
-    }
+  def getEdges(q: Query): Future[StepResult] = {
     Try {
-
       if (q.steps.isEmpty) {
         // TODO: this should be get vertex query.
         fallback
       } else {
-        // current stepIdx = -1
-        val startQueryResultLs = QueryResult.fromVertices(q)
-        q.steps.foldLeft(Future.successful(startQueryResultLs)) { case (acc, step) =>
-            fetchStepFuture(q, acc)
-//          fetchStepFuture(q, acc).map { stepResults =>
-//            step.queryParams.zip(stepResults).foreach { case (qParam, queryRequestWithResult)  =>
-//              val cursor = Base64.getEncoder.encodeToString(queryRequestWithResult.queryResult.tailCursor)
-//              qParam.cursorOpt = Option(cursor)
-//            }
-//            stepResults
-//          }
+        val filterOutFuture = q.queryOption.filterOutQuery match {
+          case None => innerFallback
+          case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery)
+        }
+        for {
+          innerResult <- getEdgesStepInner(q)
+          filterOutInnerResult <- filterOutFuture
+        } yield {
+          val result = StepResult(graph, q.queryOption, innerResult)
+          if (filterOutInnerResult.isEmpty) result
+          else {
+            StepResult.filterOut(graph, q.queryOption, result, filterOutInnerResult)
+          }
         }
       }
     } recover {
@@ -1220,7 +1227,34 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
     } get
   }
 
-  def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[Seq[QueryRequestWithResult]] = {
+  def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = {
+    val fallback = Future.successful(StepResult.Empty)
+
+    Try {
+      if (mq.queries.isEmpty) fallback
+      else {
+        val filterOutFuture = mq.queryOption.filterOutQuery match {
+          case None => innerFallback
+          case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery)
+        }
+
+        val multiQueryFutures = Future.sequence(mq.queries.map { query => getEdges(query) })
+        for {
+          multiQueryResults <- multiQueryFutures
+          filterOutInnerResult <- filterOutFuture
+        } yield {
+          val merged = StepResult.merges(mq.queryOption, multiQueryResults, mq.weights)
+          StepResult.filterOut(graph, mq.queryOption, merged, filterOutInnerResult)
+        }
+      }
+    } recover {
+      case e: Exception =>
+        logger.error(s"getEdgesAsync: $e", e)
+        fallback
+    } get
+  }
+
+  def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[StepResult] = {
     val ts = System.currentTimeMillis()
     val futures = for {
       (srcVertex, tgtVertex, queryParam) <- params
@@ -1228,15 +1262,18 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
       edge = Edge(srcVertex, tgtVertex, queryParam.labelWithDir, propsWithTs = propsWithTs)
     } yield {
         fetchSnapshotEdge(edge).map { case (queryParam, edgeOpt, kvOpt) =>
-          val _queryParam = queryParam.tgtVertexInnerIdOpt(Option(edge.tgtVertex.innerId))
-          val q = Query.toQuery(Seq(edge.srcVertex), _queryParam)
-          val queryRequest = QueryRequest(q, 0, edge.srcVertex, _queryParam)
-          val queryResult = QueryResult(edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0)))
-          QueryRequestWithResult(queryRequest, queryResult)
+          edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0))
         }
       }
 
-    Future.sequence(futures)
+    Future.sequence(futures).map { edgeWithScoreLs =>
+      val s2EdgeWithScoreLs = edgeWithScoreLs.flatMap { ls =>
+        ls.map { edgeWithScore =>
+          S2EdgeWithScore(edgeWithScore.edge, edgeWithScore.score)
+        }
+      }
+      StepResult(results = s2EdgeWithScoreLs, grouped = Nil, degreeEdges = Nil)
+    }
   }
 
 
@@ -1266,6 +1303,13 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
     }
 
   }
+
+  protected def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = {
+    val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score }
+    edgeWithScores.map { edgeWithScore =>
+      edgeWithScore.copy(score = edgeWithScore.score / sum)
+    }
+  }
   /** end of query */
 
   /** Mutation Builder */
@@ -1290,19 +1334,19 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
     (edgeMutate.edgesToDelete.isEmpty, edgeMutate.edgesToInsert.isEmpty) match {
       case (true, true) =>
 
-        /* when there is no need to update. shouldUpdate == false */
+        /** when there is no need to update. shouldUpdate == false */
         List.empty
       case (true, false) =>
 
-        /* no edges to delete but there is new edges to insert so increase degree by 1 */
+        /** no edges to delete but there is new edges to insert so increase degree by 1 */
         edgeMutate.edgesToInsert.flatMap { e => buildIncrementsAsync(e) }
       case (false, true) =>
 
-        /* no edges to insert but there is old edges to delete so decrease degree by 1 */
+        /** no edges to insert but there is old edges to delete so decrease degree by 1 */
         edgeMutate.edgesToDelete.flatMap { e => buildIncrementsAsync(e, -1L) }
       case (false, false) =>
 
-        /* update on existing edges so no change on degree */
+        /** update on existing edges so no change on degree */
         List.empty
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 138216b..b52ba53 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -82,8 +82,9 @@ object AsynchbaseStorage {
 }
 
 
-class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionContext)
-  extends Storage[Deferred[QueryRequestWithResult]](config) {
+class AsynchbaseStorage(override val graph: Graph,
+                        override val config: Config)(implicit ec: ExecutionContext)
+  extends Storage[Deferred[StepInnerResult]](graph, config) {
 
   import Extensions.DeferOps
 
@@ -100,14 +101,16 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
   private val emptyKeyValues = new util.ArrayList[KeyValue]()
   private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client
 
-  import CanDefer._
-
   /** Future Cache to squash request */
-  private val futureCache = new DeferCache[QueryResult, Deferred, Deferred](config, QueryResult(), "FutureCache", useMetric = true)
+  private val futureCache = new DeferCache[StepInnerResult, Deferred, Deferred](config, StepInnerResult.Empty, "FutureCache", useMetric = true)
 
   /** Simple Vertex Cache */
   private val vertexCache = new DeferCache[Seq[SKeyValue], Promise, Future](config, Seq.empty[SKeyValue])
 
+  private val zkQuorum = config.getString("hbase.zookeeper.quorum")
+  private val zkQuorumSlave =
+    if (config.hasPath("hbase.zookeeper.quorum")) Option(config.getString("hbase.zookeeper.quorum"))
+    else None
 
   /**
    * fire rpcs into proper hbase cluster using client and
@@ -241,7 +244,7 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
         if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: $queryParam")
 
         scanner.setMaxVersions(1)
-        scanner.setMaxNumRows(queryParam.limit)
+        scanner.setMaxNumRows(queryParam.offset + queryParam.limit)
         scanner.setMaxTimestamp(maxTs)
         scanner.setMinTimestamp(minTs)
         scanner.setRpcTimeout(queryParam.rpcTimeoutInMillis)
@@ -280,21 +283,38 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
   override def fetch(queryRequest: QueryRequest,
                      prevStepScore: Double,
                      isInnerCall: Boolean,
-                     parentEdges: Seq[EdgeWithScore]): Deferred[QueryRequestWithResult] = {
+                     parentEdges: Seq[EdgeWithScore]): Deferred[StepInnerResult] = {
+
+    def fetchInner(hbaseRpc: AnyRef): Deferred[StepInnerResult] = {
+      val queryParam = queryRequest.queryParam
 
-    def fetchInner(hbaseRpc: AnyRef): Deferred[QueryResult] = {
       fetchKeyValuesInner(hbaseRpc).withCallback { kvs =>
-        val edgeWithScores = toEdges(kvs, queryRequest.queryParam, prevStepScore, isInnerCall, parentEdges)
-        val resultEdgesWithScores = if (queryRequest.queryParam.sample >= 0) {
-          sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
-        } else edgeWithScores
-        QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty[Byte]))
-//        QueryRequestWithResult(queryRequest, QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty)))
+        val (startOffset, length) = queryParam.label.schemaVersion match {
+          case HBaseType.VERSION4 => (queryParam.offset, queryParam.limit)
+          case _ => (0, kvs.length)
+        }
+
+        val edgeWithScores = toEdges(kvs, queryParam, prevStepScore, isInnerCall, parentEdges, startOffset, length)
+        if (edgeWithScores.isEmpty) StepInnerResult.Empty
+        else {
+          val head = edgeWithScores.head
+          val (degreeEdges, indexEdges) =
+            if (head.edge.isDegree) (Seq(head), edgeWithScores.tail)
+            else (Nil, edgeWithScores)
 
+          val normalized =
+            if (queryRequest.queryParam.shouldNormalize) normalize(indexEdges)
+            else indexEdges
+
+          val sampled = if (queryRequest.queryParam.sample >= 0) {
+            sample(queryRequest, normalized, queryRequest.queryParam.sample)
+          } else normalized
+
+          StepInnerResult(edgesWithScoreLs = sampled, degreeEdges)
+        }
       } recoverWith { ex =>
         logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex)
-        QueryResult(isFailure = true)
-//        QueryRequestWithResult(queryRequest, QueryResult(isFailure = true))
+        StepInnerResult.Failure
       }
     }
 
@@ -302,27 +322,25 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
     val cacheTTL = queryParam.cacheTTLInMillis
     val request = buildRequest(queryRequest)
 
-    val defer =
-      if (cacheTTL <= 0) fetchInner(request)
-      else {
-        val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request))
-        val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
-        futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
+    if (cacheTTL <= 0) fetchInner(request)
+    else {
+      val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request))
+      val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
+      futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
     }
-    defer withCallback { queryResult => QueryRequestWithResult(queryRequest, queryResult)}
   }
 
 
   override def fetches(queryRequestWithScoreLs: scala.Seq[(QueryRequest, Double)],
-                       prevStepEdges: Predef.Map[VertexId, scala.Seq[EdgeWithScore]]): Future[scala.Seq[QueryRequestWithResult]] = {
-    val defers: Seq[Deferred[QueryRequestWithResult]] = for {
+                       prevStepEdges: Predef.Map[VertexId, scala.Seq[EdgeWithScore]]): Future[scala.Seq[StepInnerResult]] = {
+    val defers: Seq[Deferred[StepInnerResult]] = for {
       (queryRequest, prevStepScore) <- queryRequestWithScoreLs
       parentEdges <- prevStepEdges.get(queryRequest.vertex.id)
     } yield fetch(queryRequest, prevStepScore, isInnerCall = false, parentEdges)
 
-    val grouped: Deferred[util.ArrayList[QueryRequestWithResult]] = Deferred.group(defers)
+    val grouped: Deferred[util.ArrayList[StepInnerResult]] = Deferred.group(defers)
     grouped withCallback {
-      queryResults: util.ArrayList[QueryRequestWithResult] =>
+      queryResults: util.ArrayList[StepInnerResult] =>
         queryResults.toIndexedSeq
     } toFuture
   }
@@ -371,47 +389,56 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
   }
 
 
-  override def createTable(zkAddr: String,
+  override def createTable(_zkAddr: String,
                            tableName: String,
                            cfs: List[String],
                            regionMultiplier: Int,
                            ttl: Option[Int],
                            compressionAlgorithm: String): Unit = {
-    logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm")
-    val admin = getAdmin(zkAddr)
-    val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier
-    try {
-      if (!admin.tableExists(TableName.valueOf(tableName))) {
-        try {
-          val desc = new HTableDescriptor(TableName.valueOf(tableName))
-          desc.setDurability(Durability.ASYNC_WAL)
-          for (cf <- cfs) {
-            val columnDesc = new HColumnDescriptor(cf)
-              .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
-              .setBloomFilterType(BloomType.ROW)
-              .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
-              .setMaxVersions(1)
-              .setTimeToLive(2147483647)
-              .setMinVersions(0)
-              .setBlocksize(32768)
-              .setBlockCacheEnabled(true)
-            if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
-            desc.addFamily(columnDesc)
-          }
+    /** TODO: Decide if we will allow each app server to connect to multiple hbase cluster */
+    for {
+      zkAddr <- Seq(zkQuorum) ++ zkQuorumSlave.toSeq
+    } {
+      logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm")
+      val admin = getAdmin(zkAddr)
+      val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier
+      try {
+        if (!admin.tableExists(TableName.valueOf(tableName))) {
+          try {
+            val desc = new HTableDescriptor(TableName.valueOf(tableName))
+            desc.setDurability(Durability.ASYNC_WAL)
+            for (cf <- cfs) {
+              val columnDesc = new HColumnDescriptor(cf)
+                .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
+                .setBloomFilterType(BloomType.ROW)
+                .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
+                .setMaxVersions(1)
+                .setTimeToLive(2147483647)
+                .setMinVersions(0)
+                .setBlocksize(32768)
+                .setBlockCacheEnabled(true)
+              if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
+              desc.addFamily(columnDesc)
+            }
 
-          if (regionCount <= 1) admin.createTable(desc)
-          else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount)
-        } catch {
-          case e: Throwable =>
-            logger.error(s"$zkAddr, $tableName failed with $e", e)
-            throw e
+            if (regionCount <= 1) admin.createTable(desc)
+            else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount)
+          } catch {
+            case e: Throwable =>
+              logger.error(s"$zkAddr, $tableName failed with $e", e)
+              throw e
+          }
+        } else {
+          logger.info(s"$zkAddr, $tableName, $cfs already exist.")
         }
-      } else {
-        logger.info(s"$zkAddr, $tableName, $cfs already exist.")
+      } catch {
+        case e: Throwable =>
+          logger.error(s"$zkAddr, $tableName failed with $e", e)
+          throw e
+      } finally {
+        admin.close()
+        admin.getConnection.close()
       }
-    } finally {
-      admin.close()
-      admin.getConnection.close()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
index 83d4338..c700e53 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
@@ -63,5 +63,5 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge
     else if (indexEdge.op == GraphUtil.operations("incrementCount"))
       Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong)
     else propsToKeyValues(indexEdge.metas.toSeq)
-  
+
  }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
index 4149540..b402c0f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
@@ -51,6 +51,8 @@ object logger {
   private val logger = LoggerFactory.getLogger("application")
   private val errorLogger = LoggerFactory.getLogger("error")
   private val metricLogger = LoggerFactory.getLogger("metrics")
+  private val queryLogger = LoggerFactory.getLogger("query")
+  private val malformedLogger = LoggerFactory.getLogger("malformed")
 
   def metric[T: Loggable](msg: => T) = metricLogger.info(implicitly[Loggable[T]].toLogMessage(msg))
 
@@ -61,6 +63,10 @@ object logger {
   def error[T: Loggable](msg: => T, exception: => Throwable) = errorLogger.error(implicitly[Loggable[T]].toLogMessage(msg), exception)
 
   def error[T: Loggable](msg: => T) = errorLogger.error(implicitly[Loggable[T]].toLogMessage(msg))
+
+  def query[T: Loggable](msg: => T) = queryLogger.info(implicitly[Loggable[T]].toLogMessage(msg))
+
+  def malformed[T: Loggable](msg: => T, exception: => Throwable) = malformedLogger.error(implicitly[Loggable[T]].toLogMessage(msg), exception)
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
index a018c01..6933320 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
@@ -19,10 +19,12 @@
 
 package org.apache.s2graph.core
 
+import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core.mysqls.LabelMeta
 import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs, VertexId}
 import org.apache.s2graph.core.utils.logger
 import org.scalatest.FunSuite
+import play.api.libs.json.{JsObject, Json}
 
 class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
   initTests()
@@ -39,7 +41,8 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
     val (srcId, tgtId, labelName) = ("1", "2", testLabelName)
 
     val bulkEdge = (for ((ts, op, props) <- bulkQueries) yield {
-      Management.toEdge(ts.toLong, op, srcId, tgtId, labelName, "out", props).toLogString
+      val properties = fromJsonToProperties(Json.parse(props).as[JsObject])
+      Edge.toEdge(srcId, tgtId, labelName, "out", properties, ts.toLong, op).toLogString
     }).mkString("\n")
 
     val expected = Seq(


[3/3] incubator-s2graph git commit: [S2GRAPH-121]: Create `Result` class to hold traverse result edges.

Posted by st...@apache.org.
[S2GRAPH-121]: Create `Result` class to hold traverse result edges.

JIRA:
  [S2GRAPH-121] https://issues.apache.org/jira/browse/S2GRAPH-121

Pull Request:
  Closes #96

Authors
  DO YUNG YOON: steamshon@apache.org


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/8dbb9a3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/8dbb9a3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/8dbb9a3e

Branch: refs/heads/master
Commit: 8dbb9a3eeff6f412fe5defee2baffee32f174981
Parents: b590831
Author: DO YUNG YOON <st...@apache.org>
Authored: Wed Nov 16 16:56:41 2016 +0100
Committer: DO YUNG YOON <st...@apache.org>
Committed: Wed Nov 16 17:01:07 2016 +0100

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../org/apache/s2graph/core/mysqls/schema.sql   |   1 +
 .../scala/org/apache/s2graph/core/Edge.scala    |  82 ++-
 .../apache/s2graph/core/ExceptionHandler.scala  |  19 +-
 .../scala/org/apache/s2graph/core/Graph.scala   | 232 ++++---
 .../apache/s2graph/core/GraphExceptions.scala   |   2 +
 .../org/apache/s2graph/core/Management.scala    |  69 +--
 .../org/apache/s2graph/core/OrderingUtil.scala  |   7 +
 .../org/apache/s2graph/core/PostProcess.scala   | 601 ++++---------------
 .../org/apache/s2graph/core/QueryParam.scala    |  75 ++-
 .../org/apache/s2graph/core/QueryResult.scala   | 234 +++++++-
 .../scala/org/apache/s2graph/core/Vertex.scala  |  74 ++-
 .../apache/s2graph/core/mysqls/Experiment.scala |   3 +-
 .../org/apache/s2graph/core/mysqls/Label.scala  |  59 +-
 .../s2graph/core/mysqls/ServiceColumn.scala     |  31 +-
 .../s2graph/core/parsers/WhereParser.scala      |   2 +-
 .../s2graph/core/rest/RequestParser.scala       | 177 ++++--
 .../apache/s2graph/core/rest/RestHandler.scala  | 182 +++---
 .../apache/s2graph/core/storage/Storage.scala   | 336 ++++++-----
 .../core/storage/hbase/AsynchbaseStorage.scala  | 147 +++--
 .../indexedge/wide/IndexEdgeSerializable.scala  |   2 +-
 .../org/apache/s2graph/core/utils/Logger.scala  |   6 +
 .../org/apache/s2graph/core/EdgeTest.scala      |   5 +-
 .../s2graph/core/Integrate/CrudTest.scala       | 163 +++--
 .../core/Integrate/IntegrateCommon.scala        |  10 +-
 .../s2graph/core/Integrate/QueryTest.scala      | 159 ++---
 .../core/Integrate/StrongLabelDeleteTest.scala  |  14 +-
 .../core/Integrate/VertexTestHelper.scala       |   7 +-
 .../core/Integrate/WeakLabelDeleteTest.scala    |   9 +-
 .../apache/s2graph/core/JsonParserTest.scala    |   3 +-
 .../apache/s2graph/core/QueryParamTest.scala    |  56 +-
 .../s2graph/core/TestCommonWithModels.scala     |   2 +-
 .../core/benchmark/JsonBenchmarkSpec.scala      |  62 +-
 .../core/benchmark/SamplingBenchmarkSpec.scala  | 207 +++----
 .../org/apache/s2graph/rest/netty/Server.scala  |  96 ++-
 .../apache/s2graph/rest/play/Bootstrap.scala    |   4 +-
 .../s2graph/rest/play/config/Config.scala       |   4 +
 .../controllers/ApplicationController.scala     |   6 +-
 .../play/controllers/CounterController.scala    |  21 +-
 .../rest/play/controllers/EdgeController.scala  | 164 +++--
 .../play/controllers/ExperimentController.scala |   5 +-
 .../play/controllers/PublishController.scala    |   8 -
 .../rest/play/controllers/QueryController.scala |  21 +-
 .../play/controllers/VertexController.scala     |  18 +-
 s2rest_play/conf/reference.conf                 |   1 +
 s2rest_play/conf/routes                         |   7 +-
 46 files changed, 1892 insertions(+), 1503 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 0c8edc3..73b3597 100644
--- a/CHANGES
+++ b/CHANGES
@@ -94,6 +94,8 @@ Release 0.1.0 - unreleased
     
     S2GRAPH-127: Refactor ExceptionHander Object into Class (Committed by DOYUNG YOON).
 
+    S2GRAPH-121: Create `Result` class to hold traverse result edges (Committed by DOYUNG YOON).
+
   BUG FIXES
 
     S2GRAPH-18: Query Option "interval" is Broken. 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
----------------------------------------------------------------------
diff --git a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
index d8ee5bc..822df6c 100644
--- a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
+++ b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
@@ -96,6 +96,7 @@ CREATE TABLE `labels` (
   `is_async` tinyint(4) NOT NULL default '0',
   `compressionAlgorithm` varchar(64) NOT NULL DEFAULT 'lz4',
   `options` text,
+  `deleted_at` datetime DEFAULT NULL,
   PRIMARY KEY (`id`),
   UNIQUE KEY `ux_label` (`label`),
   INDEX `idx_labels_src_column_name` (`src_column_name`),

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
index 4d4afe0..97730f3 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
@@ -19,10 +19,11 @@
 
 package org.apache.s2graph.core
 
+import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
+import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.utils.logger
-import org.apache.s2graph.core.JSONParser._
 import play.api.libs.json.{JsNumber, Json}
 import scala.collection.JavaConversions._
 import scala.util.hashing.MurmurHash3
@@ -112,7 +113,7 @@ case class IndexEdge(srcVertex: Vertex,
   lazy val ordersKeyMap = orders.map { case (byte, _) => byte }.toSet
   lazy val metas = for ((k, v) <- props if !ordersKeyMap.contains(k)) yield k -> v.innerVal
 
-  //  lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) }
+//  lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) }
 
   //TODO:
   //  lazy val kvs = Graph.client.indexedEdgeSerializer(this).toKeyValues.toList
@@ -151,8 +152,23 @@ case class Edge(srcVertex: Vertex,
   val schemaVer = label.schemaVersion
   val ts = propsWithTs(LabelMeta.timeStampSeq).innerVal.toString.toLong
 
+  lazy val srcId = srcVertex.innerIdVal
+  lazy val tgtId = tgtVertex.innerIdVal
+  lazy val labelName = label.label
+  lazy val direction = GraphUtil.fromDirection(labelWithDir.dir)
+  lazy val properties = toProps()
+
   def props = propsWithTs.mapValues(_.innerVal)
 
+
+  private def toProps(): Map[String, Any] = {
+    for {
+      (labelMeta, defaultVal) <- label.metaPropsDefaultMapInner
+    } yield {
+      labelMeta.name -> propsWithTs.getOrElse(labelMeta.seq, defaultVal).innerVal.value
+    }
+  }
+
   def relatedEdges = {
     if (labelWithDir.isDirected) {
       val skipReverse = label.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false)
@@ -204,10 +220,10 @@ case class Edge(srcVertex: Vertex,
 
   def isDegree = propsWithTs.contains(LabelMeta.degreeSeq)
 
-  //  def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match {
-  //    case Some(_) => props
-  //    case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer))
-  //  }
+//  def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match {
+//    case Some(_) => props
+//    case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer))
+//  }
 
   def propsPlusTsValid = propsWithTs.filter(kv => kv._1 >= 0)
 
@@ -290,8 +306,31 @@ case class Edge(srcVertex: Vertex,
 
     ret.mkString("\t")
   }
+
+  def selectValues(selectColumns: Seq[String],
+                   useToString: Boolean = true,
+                   score: Double = 0.0): Seq[Option[Any]] = {
+    //TODO: Option should be matched in JsonParser anyTo*
+    for {
+      selectColumn <- selectColumns
+    } yield {
+      val valueOpt = selectColumn match {
+        case LabelMeta.from.name | "from" => Option(srcId)
+        case LabelMeta.to.name | "to" => Option(tgtId)
+        case "label" => Option(labelName)
+        case "direction" => Option(direction)
+        case "score" => Option(score)
+        case LabelMeta.timestamp.name | "timestamp" => Option(ts)
+        case _ =>
+          properties.get(selectColumn)
+      }
+      if (useToString) valueOpt.map(_.toString)
+      else valueOpt
+    }
+  }
 }
 
+
 case class EdgeMutate(edgesToDelete: List[IndexEdge] = List.empty[IndexEdge],
                       edgesToInsert: List[IndexEdge] = List.empty[IndexEdge],
                       newSnapshotEdge: Option[SnapshotEdge] = None) {
@@ -310,6 +349,33 @@ object Edge {
   val incrementVersion = 1L
   val minTsVal = 0L
 
+  def toEdge(srcId: Any,
+                    tgtId: Any,
+                    labelName: String,
+                    direction: String,
+                    props: Map[String, Any] = Map.empty,
+                    ts: Long = System.currentTimeMillis(),
+                    operation: String = "insert"): Edge = {
+    val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
+
+    val srcVertexId = toInnerVal(srcId.toString, label.srcColumn.columnType, label.schemaVersion)
+    val tgtVertexId = toInnerVal(tgtId.toString, label.tgtColumn.columnType, label.schemaVersion)
+
+    val srcColId = label.srcColumn.id.get
+    val tgtColId = label.tgtColumn.id.get
+
+    val srcVertex = Vertex(SourceVertexId(srcColId, srcVertexId), System.currentTimeMillis())
+    val tgtVertex = Vertex(TargetVertexId(tgtColId, tgtVertexId), System.currentTimeMillis())
+    val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
+
+    val labelWithDir = LabelWithDirection(label.id.get, dir)
+    val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts)
+    val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
+    val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
+
+    new Edge(srcVertex, tgtVertex, labelWithDir, op = op, version = ts, propsWithTs = propsWithTs)
+  }
+
   /** now version information is required also **/
   type State = Map[Byte, InnerValLikeWithTs]
   type PropsPairWithTs = (State, State, Long, String)
@@ -387,7 +453,7 @@ object Edge {
 
       //      logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}")
       //      logger.error(s"$propsWithTs")
-      (requestEdge, edgeMutate)
+      (requestEdge.copy(propsWithTs = propsWithTs), edgeMutate)
     }
   }
 
@@ -582,7 +648,7 @@ object Edge {
     (propsWithTs, true)
   }
 
-  def fromString(s: String): Option[Edge] = Graph.toEdge(s)
+//  def fromString(s: String): Option[Edge] = Graph.toEdge(s)
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
index d03d483..48976d3 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
@@ -24,8 +24,11 @@ import java.util.Properties
 import com.typesafe.config.Config
 import org.apache.kafka.clients.producer._
 import org.apache.s2graph.core.utils.logger
+import play.api.libs.json.JsValue
 
 class ExceptionHandler(config: Config) {
+
+
   import ExceptionHandler._
 
   val keyBrokerList = "kafka.metadata.broker.list"
@@ -43,7 +46,6 @@ class ExceptionHandler(config: Config) {
       }
     } else None
 
-
   def enqueue(m: KafkaMessage): Unit = {
     producer match {
       case None => logger.debug(s"skip log to Kafka: ${m}")
@@ -68,24 +70,33 @@ object ExceptionHandler {
   type Key = String
   type Val = String
 
-  def toKafkaMessage(topic: String, element: GraphElement, originalString: Option[String] = None) = {
+  def toKafkaMessage(topic: String,
+                     element: GraphElement,
+                     originalString: Option[String] = None,
+                     produceJson: Boolean = false) = {
+    val edgeString = originalString.getOrElse(element.toLogString())
+    val msg = edgeString
+
     KafkaMessage(
       new ProducerRecord[Key, Val](
         topic,
         element.queuePartitionKey,
-        originalString.getOrElse(element.toLogString())))
+        msg))
   }
 
+  // only used in deleteAll
   def toKafkaMessage(topic: String, tsv: String) = {
     KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv))
   }
 
+  def toKafkaMessage(topic: String, jsValue: JsValue): KafkaMessage = toKafkaMessage(topic, jsValue.toString())
+
   case class KafkaMessage(msg: ProducerRecord[Key, Val])
 
   private def toKafkaProp(config: Config) = {
     val props = new Properties()
 
-    /* all default configuration for new producer */
+    /** all default configuration for new producer */
     val brokers =
       if (config.hasPath("kafka.metadata.broker.list")) config.getString("kafka.metadata.broker.list")
       else "localhost"

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
index c25b71a..2cbddea 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
@@ -23,11 +23,13 @@ import java.util
 import java.util.concurrent.ConcurrentHashMap
 
 import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core.mysqls.{Label, Model}
 import org.apache.s2graph.core.parsers.WhereParser
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
 import org.apache.s2graph.core.types.{InnerVal, LabelWithDirection}
 import org.apache.s2graph.core.utils.logger
+import play.api.libs.json.{JsObject, Json}
 
 import scala.collection.JavaConversions._
 import scala.collection._
@@ -58,6 +60,7 @@ object Graph {
     "back.off.timeout" -> java.lang.Integer.valueOf(1000),
     "hbase.fail.prob" -> java.lang.Double.valueOf(-0.1),
     "delete.all.fetch.size" -> java.lang.Integer.valueOf(1000),
+    "delete.all.fetch.count" -> java.lang.Integer.valueOf(200),
     "future.cache.max.size" -> java.lang.Integer.valueOf(100000),
     "future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
     "future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
@@ -84,10 +87,9 @@ object Graph {
     (hashKey, filterHashKey)
   }
 
-  def alreadyVisitedVertices(queryResultLs: Seq[QueryResult]): Map[(LabelWithDirection, Vertex), Boolean] = {
+  def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): Map[(LabelWithDirection, Vertex), Boolean] = {
     val vertices = for {
-      queryResult <- queryResultLs
-      edgeWithScore <- queryResult.edgeWithScoreLs
+      edgeWithScore <- edgeWithScoreLs
       edge = edgeWithScore.edge
       vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex
     } yield (edge.labelWithDir, vertex) -> true
@@ -132,81 +134,36 @@ object Graph {
     tsVal
   }
 
-  def aggregateScore(newScore: Double,
-                     resultEdges: ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)],
-                     duplicateEdges: ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]],
-                     edgeWithScoreSorted: ListBuffer[(HashKey, FilterHashKey, Edge, Double)],
-                     hashKey: HashKey,
-                     filterHashKey: FilterHashKey,
-                     queryParam: QueryParam,
-                     convertedEdge: Edge) = {
-
-    /* skip duplicate policy check if consistencyLevel is strong */
-    if (queryParam.label.consistencyLevel != "strong" && resultEdges.containsKey(hashKey)) {
-      val (oldFilterHashKey, oldEdge, oldScore) = resultEdges.get(hashKey)
+  def processDuplicates(queryParam: QueryParam,
+                        duplicates: Seq[(FilterHashKey, EdgeWithScore)]): Seq[(FilterHashKey, EdgeWithScore)] = {
+
+    if (queryParam.label.consistencyLevel != "strong") {
       //TODO:
       queryParam.duplicatePolicy match {
-        case Query.DuplicatePolicy.First => // do nothing
-        case Query.DuplicatePolicy.Raw =>
-          if (duplicateEdges.containsKey(hashKey)) {
-            duplicateEdges.get(hashKey).append(convertedEdge -> newScore)
-          } else {
-            val newBuffer = new ListBuffer[(Edge, Double)]
-            newBuffer.append(convertedEdge -> newScore)
-            duplicateEdges.put(hashKey, newBuffer)
-          }
+        case Query.DuplicatePolicy.First => Seq(duplicates.head)
+        case Query.DuplicatePolicy.Raw => duplicates
         case Query.DuplicatePolicy.CountSum =>
-          resultEdges.put(hashKey, (filterHashKey, oldEdge, oldScore + 1))
+          val countSum = duplicates.size
+          Seq(duplicates.head._1 -> duplicates.head._2.copy(score = countSum))
         case _ =>
-          resultEdges.put(hashKey, (filterHashKey, oldEdge, oldScore + newScore))
+          val scoreSum = duplicates.foldLeft(0.0) { case (prev, current) => prev + current._2.score }
+          Seq(duplicates.head._1 -> duplicates.head._2.copy(score = scoreSum))
       }
     } else {
-      resultEdges.put(hashKey, (filterHashKey, convertedEdge, newScore))
-      edgeWithScoreSorted.append((hashKey, filterHashKey, convertedEdge, newScore))
+      duplicates
     }
   }
-
-  def aggregateResults(queryRequestWithResult: QueryRequestWithResult,
-                       queryParamResult: Result,
-                       edgesToInclude: util.HashSet[FilterHashKey],
-                       edgesToExclude: util.HashSet[FilterHashKey]): QueryRequestWithResult = {
-    val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
-    val (query, stepIdx, _, queryParam) = QueryRequest.unapply(queryRequest).get
-
-    val (duplicateEdges, resultEdges, edgeWithScoreSorted) = queryParamResult
-    val edgesWithScores = for {
-      (hashKey, filterHashKey, edge, _) <- edgeWithScoreSorted if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
-      score = resultEdges.get(hashKey)._3
-      (duplicateEdge, aggregatedScore) <- fetchDuplicatedEdges(edge, score, hashKey, duplicateEdges) if aggregatedScore >= queryParam.threshold
-    } yield EdgeWithScore(duplicateEdge, aggregatedScore)
-
-    QueryRequestWithResult(queryRequest, QueryResult(edgesWithScores))
-  }
-
-  def fetchDuplicatedEdges(edge: Edge,
-                           score: Double,
-                           hashKey: HashKey,
-                           duplicateEdges: ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]]) = {
-    (edge -> score) +: (if (duplicateEdges.containsKey(hashKey)) duplicateEdges.get(hashKey) else Seq.empty)
-  }
-
-  def queryResultWithFilter(queryRequestWithResult: QueryRequestWithResult) = {
-    val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
-    val (_, _, _, queryParam) = QueryRequest.unapply(queryRequest).get
-    val whereFilter = queryParam.where.get
-    if (whereFilter == WhereParser.success) queryResult.edgeWithScoreLs
-    else queryResult.edgeWithScoreLs.withFilter(edgeWithScore => whereFilter.filter(edgeWithScore.edge))
-  }
-
-  def filterEdges(queryResultLsFuture: Future[Seq[QueryRequestWithResult]],
+  def filterEdges(q: Query,
+                  stepIdx: Int,
+                  queryRequests: Seq[QueryRequest],
+                  queryResultLsFuture: Future[Seq[StepInnerResult]],
+                  queryParams: Seq[QueryParam],
                   alreadyVisited: Map[(LabelWithDirection, Vertex), Boolean] = Map.empty[(LabelWithDirection, Vertex), Boolean])
-                 (implicit ec: scala.concurrent.ExecutionContext): Future[Seq[QueryRequestWithResult]] = {
+                 (implicit ec: scala.concurrent.ExecutionContext): Future[StepInnerResult] = {
 
     queryResultLsFuture.map { queryRequestWithResultLs =>
-      if (queryRequestWithResultLs.isEmpty) Nil
+      if (queryRequestWithResultLs.isEmpty) StepInnerResult.Empty
       else {
-        val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResultLs.head).get
-        val (q, stepIdx, srcVertex, queryParam) = QueryRequest.unapply(queryRequest).get
         val step = q.steps(stepIdx)
 
         val nextStepOpt = if (stepIdx < q.steps.size - 1) Option(q.steps(stepIdx + 1)) else None
@@ -219,73 +176,114 @@ object Graph {
         val edgesToExclude = new util.HashSet[FilterHashKey]()
         val edgesToInclude = new util.HashSet[FilterHashKey]()
 
-        val queryParamResultLs = new ListBuffer[Result]
-        queryRequestWithResultLs.foreach { queryRequestWithResult =>
-          val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
+        val sequentialLs = new ListBuffer[(HashKey, FilterHashKey, EdgeWithScore)]()
+        val agg = new mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, EdgeWithScore)]]()
+        val params = new mutable.HashMap[HashKey, QueryParam]()
+        var numOfDuplicates = 0
+        var numOfTotal = 0
+        queryRequests.zip(queryRequestWithResultLs).foreach { case (queryRequest, stepInnerResult) =>
           val queryParam = queryRequest.queryParam
-          val duplicateEdges = new util.concurrent.ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]]()
-          val resultEdges = new util.concurrent.ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)]()
-          val edgeWithScoreSorted = new ListBuffer[(HashKey, FilterHashKey, Edge, Double)]
           val labelWeight = step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0)
-
-          // store degree value with Array.empty so if degree edge exist, it comes at very first.
-          def checkDegree() = queryResult.edgeWithScoreLs.headOption.exists { edgeWithScore =>
-            edgeWithScore.edge.isDegree
-          }
-          var isDegree = checkDegree()
-
           val includeExcludeKey = queryParam.labelWithDir.labelId -> queryParam.labelWithDir.dir
           val shouldBeExcluded = excludeLabelWithDirSet.contains(includeExcludeKey)
           val shouldBeIncluded = includeLabelWithDirSet.contains(includeExcludeKey)
-
-          queryResultWithFilter(queryRequestWithResult).foreach { edgeWithScore =>
-            val (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+          val where = queryParam.where.get
+
+          for {
+            edgeWithScore <- stepInnerResult.edgesWithScoreLs
+            if where == WhereParser.success || where.filter(edgeWithScore.edge)
+            (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+          } {
+            numOfTotal += 1
             if (queryParam.transformer.isDefault) {
               val convertedEdge = edge
 
-              val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree)
+              val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree = false)
 
-              /* check if this edge should be exlcuded. */
-              if (shouldBeExcluded && !isDegree) {
+              /** check if this edge should be exlcuded. */
+              if (shouldBeExcluded) {
                 edgesToExclude.add(filterHashKey)
               } else {
-                if (shouldBeIncluded && !isDegree) {
+                if (shouldBeIncluded) {
                   edgesToInclude.add(filterHashKey)
                 }
                 val tsVal = processTimeDecay(queryParam, convertedEdge)
                 val newScore = labelWeight * score * tsVal
-                aggregateScore(newScore, resultEdges, duplicateEdges, edgeWithScoreSorted, hashKey, filterHashKey, queryParam, convertedEdge)
+                val newEdgeWithScore = EdgeWithScore(convertedEdge, newScore)
+                sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore))
+                agg.get(hashKey) match {
+                  case None =>
+                    val newLs = new ListBuffer[(FilterHashKey, EdgeWithScore)]()
+                    newLs += (filterHashKey -> newEdgeWithScore)
+                    agg += (hashKey -> newLs)
+                  case Some(old) =>
+                    numOfDuplicates += 1
+                    old += (filterHashKey -> newEdgeWithScore)
+                }
+                params += (hashKey -> queryParam)
               }
             } else {
               convertEdges(queryParam, edge, nextStepOpt).foreach { convertedEdge =>
-                val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree)
+                val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree = false)
 
-                /* check if this edge should be exlcuded. */
-                if (shouldBeExcluded && !isDegree) {
+                /** check if this edge should be exlcuded. */
+                if (shouldBeExcluded) {
                   edgesToExclude.add(filterHashKey)
                 } else {
-                  if (shouldBeIncluded && !isDegree) {
+                  if (shouldBeIncluded) {
                     edgesToInclude.add(filterHashKey)
                   }
                   val tsVal = processTimeDecay(queryParam, convertedEdge)
                   val newScore = labelWeight * score * tsVal
-                  aggregateScore(newScore, resultEdges, duplicateEdges, edgeWithScoreSorted, hashKey, filterHashKey, queryParam, convertedEdge)
+                  val newEdgeWithScore = EdgeWithScore(convertedEdge, newScore)
+                  sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore))
+                  agg.get(hashKey) match {
+                    case None =>
+                      val newLs = new ListBuffer[(FilterHashKey, EdgeWithScore)]()
+                      newLs += (filterHashKey -> newEdgeWithScore)
+                      agg += (hashKey -> newLs)
+                    case Some(old) =>
+                      numOfDuplicates += 1
+                      old += (filterHashKey -> newEdgeWithScore)
+                  }
+                  params += (hashKey -> queryParam)
                 }
               }
             }
-            isDegree = false
           }
-          val ret = (duplicateEdges, resultEdges, edgeWithScoreSorted)
-          queryParamResultLs.append(ret)
         }
 
-        val aggregatedResults = for {
-          (queryRequestWithResult, queryParamResult) <- queryRequestWithResultLs.zip(queryParamResultLs)
-        } yield {
-            aggregateResults(queryRequestWithResult, queryParamResult, edgesToInclude, edgesToExclude)
+
+        val edgeWithScoreLs = new ListBuffer[EdgeWithScore]()
+        if (numOfDuplicates == 0) {
+          // no duplicates at all.
+          for {
+            (hashKey, filterHashKey, edgeWithScore) <- sequentialLs
+            if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
+          } {
+            edgeWithScoreLs += edgeWithScore
           }
+        } else {
+          // need to resolve duplicates.
+          val seen = new mutable.HashSet[HashKey]()
+          for {
+            (hashKey, filterHashKey, edgeWithScore) <- sequentialLs
+            if !seen.contains(hashKey)
+            if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
+          } {
+            val queryParam = params(hashKey)
+            val duplicates = processDuplicates(queryParam, agg(hashKey))
+            duplicates.foreach { case (_, duplicate) =>
+              if (duplicate.score >= queryParam.threshold) {
+                seen += hashKey
+                edgeWithScoreLs += duplicate
+              }
+            }
+          }
+        }
 
-        aggregatedResults
+        val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges)
+        StepInnerResult(edgesWithScoreLs = edgeWithScoreLs, degreeEdges = degrees)
       }
     }
   }
@@ -310,7 +308,7 @@ object Graph {
     element
   } recover {
     case e: Exception =>
-      logger.error(s"$e", e)
+      logger.error(s"[toElement]: $e", e)
       None
   } get
 
@@ -323,37 +321,33 @@ object Graph {
     toEdge(GraphUtil.split(s))
   }
 
-  //"1418342849000\tu\te\t3286249\t71770\ttalk_friend\t{\"is_hidden\":false}"
-  //{"from":1,"to":101,"label":"graph_test","props":{"time":-1, "weight":10},"timestamp":1417616431},
   def toEdge(parts: Array[String]): Option[Edge] = Try {
     val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
-    val props = if (parts.length >= 7) parts(6) else "{}"
+    val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
     val tempDirection = if (parts.length >= 8) parts(7) else "out"
     val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection
-
-    val edge = Management.toEdge(ts.toLong, operation, srcId, tgtId, label, direction, props)
-    //            logger.debug(s"toEdge: $edge")
-    Some(edge)
+    val edge = Edge.toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation)
+    Option(edge)
   } recover {
     case e: Exception =>
-      logger.error(s"toEdge: $e", e)
+      logger.error(s"[toEdge]: $e", e)
       throw e
   } get
 
-  //"1418342850000\ti\tv\t168756793\ttalk_user_id\t{\"country_iso\":\"KR\"}"
   def toVertex(parts: Array[String]): Option[Vertex] = Try {
     val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
-    val props = if (parts.length >= 7) parts(6) else "{}"
-    Some(Management.toVertex(ts.toLong, operation, srcId, serviceName, colName, props))
+    val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
+    val vertex = Vertex.toVertex(serviceName, colName, srcId, props, ts.toLong, operation)
+    Option(vertex)
   } recover {
     case e: Throwable =>
-      logger.error(s"toVertex: $e", e)
+      logger.error(s"[toVertex]: $e", e)
       throw e
   } get
 
-  def initStorage(config: Config)(ec: ExecutionContext) = {
+  def initStorage(graph: Graph, config: Config)(ec: ExecutionContext) = {
     config.getString("s2graph.storage.backend") match {
-      case "hbase" => new AsynchbaseStorage(config)(ec)
+      case "hbase" => new AsynchbaseStorage(graph, config)(ec)
       case _ => throw new RuntimeException("not supported storage.")
     }
   }
@@ -366,7 +360,7 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) {
   Model.loadCache()
 
   // TODO: Make storage client by config param
-  val storage = Graph.initStorage(config)(ec)
+  val storage = Graph.initStorage(this, config)(ec)
 
 
   for {
@@ -375,9 +369,11 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) {
   } logger.info(s"[Initialized]: $k, ${this.config.getAnyRef(k)}")
 
   /** select */
-  def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[Seq[QueryRequestWithResult]] = storage.checkEdges(params)
+  def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[StepResult] = storage.checkEdges(params)
+
+  def getEdges(q: Query): Future[StepResult] = storage.getEdges(q)
 
-  def getEdges(q: Query): Future[Seq[QueryRequestWithResult]] = storage.getEdges(q)
+  def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = storage.getEdgesMultiQuery(mq)
 
   def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = storage.getVertices(vertices)
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
index 0898ffa..2f090cf 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
@@ -31,6 +31,8 @@ object GraphExceptions {
 
   case class LabelAlreadyExistException(msg: String) extends Exception(msg)
 
+  case class LabelNameTooLongException(msg: String) extends Exception(msg)
+
   case class InternalException(msg: String) extends Exception(msg)
 
   case class IllegalDataTypeException(msg: String) extends Exception(msg)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index 39020c7..9ef7c14 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -19,7 +19,7 @@
 
 package org.apache.s2graph.core
 
-import org.apache.s2graph.core.GraphExceptions.{InvalidHTableException, LabelAlreadyExistException, LabelNotExistException}
+import org.apache.s2graph.core.GraphExceptions.{LabelNameTooLongException, InvalidHTableException, LabelAlreadyExistException, LabelNotExistException}
 import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
 import org.apache.s2graph.core.mysqls._
 import org.apache.s2graph.core.types.HBaseType._
@@ -48,9 +48,9 @@ object Management {
 
   import HBaseType._
 
+  val LABEL_NAME_MAX_LENGTH = 100
   val DefaultCompressionAlgorithm = "gz"
 
-
   def findService(serviceName: String) = {
     Service.findByName(serviceName, useCache = false)
   }
@@ -190,62 +190,6 @@ object Management {
     }
   }
 
-  def toEdge(ts: Long, operation: String, srcId: String, tgtId: String,
-             labelStr: String, direction: String = "", props: String): Edge = {
-
-    val label = tryOption(labelStr, getServiceLabel)
-    val dir =
-      if (direction == "")
-//        GraphUtil.toDirection(label.direction)
-        GraphUtil.directions("out")
-      else
-        GraphUtil.toDirection(direction)
-
-    //    logger.debug(s"$srcId, ${label.srcColumnWithDir(dir)}")
-    //    logger.debug(s"$tgtId, ${label.tgtColumnWithDir(dir)}")
-
-    val srcVertexId = toInnerVal(srcId, label.srcColumn.columnType, label.schemaVersion)
-    val tgtVertexId = toInnerVal(tgtId, label.tgtColumn.columnType, label.schemaVersion)
-
-    val srcColId = label.srcColumn.id.get
-    val tgtColId = label.tgtColumn.id.get
-    val (srcVertex, tgtVertex) = if (dir == GraphUtil.directions("out")) {
-      (Vertex(SourceVertexId(srcColId, srcVertexId), System.currentTimeMillis()),
-        Vertex(TargetVertexId(tgtColId, tgtVertexId), System.currentTimeMillis()))
-    } else {
-      (Vertex(SourceVertexId(tgtColId, tgtVertexId), System.currentTimeMillis()),
-        Vertex(TargetVertexId(srcColId, srcVertexId), System.currentTimeMillis()))
-    }
-
-    //    val dir = if (direction == "") GraphUtil.toDirection(label.direction) else GraphUtil.toDirection(direction)
-    val labelWithDir = LabelWithDirection(label.id.get, dir)
-    val op = tryOption(operation, GraphUtil.toOp)
-
-    val jsObject = Json.parse(props).asOpt[JsObject].getOrElse(Json.obj())
-    val parsedProps = toProps(label, jsObject.fields).toMap
-    val propsWithTs = parsedProps.map(kv => (kv._1 -> InnerValLikeWithTs(kv._2, ts))) ++
-      Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(ts, label.schemaVersion), ts))
-
-    Edge(srcVertex, tgtVertex, labelWithDir, op, version = ts, propsWithTs = propsWithTs)
-
-  }
-
-  def toVertex(ts: Long, operation: String, id: String, serviceName: String, columnName: String, props: String): Vertex = {
-    Service.findByName(serviceName) match {
-      case None => throw new RuntimeException(s"$serviceName does not exist. create service first.")
-      case Some(service) =>
-        ServiceColumn.find(service.id.get, columnName) match {
-          case None => throw new RuntimeException(s"$columnName is not exist. create service column first.")
-          case Some(col) =>
-            val idVal = toInnerVal(id, col.columnType, col.schemaVersion)
-            val op = tryOption(operation, GraphUtil.toOp)
-            val jsObject = Json.parse(props).asOpt[JsObject].getOrElse(Json.obj())
-            val parsedProps = toProps(col, jsObject).toMap
-            Vertex(VertexId(col.id.get, idVal), ts, parsedProps, op = op)
-        }
-    }
-  }
-
   def toProps(column: ServiceColumn, js: JsObject): Seq[(Int, InnerValLike)] = {
 
     val props = for {
@@ -346,7 +290,7 @@ class Management(graph: Graph) {
                   schemaVersion: String = DEFAULT_VERSION,
                   isAsync: Boolean,
                   compressionAlgorithm: String = "gz",
-                  options: Option[String] = None): Try[Label] = {
+                  options: Option[String]): Try[Label] = {
 
     val labelOpt = Label.findByName(label, useCache = false)
 
@@ -355,7 +299,8 @@ class Management(graph: Graph) {
         case Some(l) =>
           throw new GraphExceptions.LabelAlreadyExistException(s"Label name ${l.label} already exist.")
         case None =>
-          /* create all models */
+          /** create all models */
+          if (label.length > LABEL_NAME_MAX_LENGTH ) throw new LabelNameTooLongException(s"Label name ${label} too long.( max length : 40 )")
           val newLabel = Label.insertAll(label,
             srcServiceName, srcColumnName, srcColumnType,
             tgtServiceName, tgtColumnName, tgtColumnType,
@@ -387,8 +332,8 @@ class Management(graph: Graph) {
    * copy label: only used by bulk load job. not sure if we need to parameterize hbase cluster.
    */
   def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]) = {
-    val old = Label.findByName(oldLabelName).getOrElse(throw new LabelAlreadyExistException(s"Old label $oldLabelName not exists."))
-    if (Label.findByName(newLabelName).isDefined) throw new LabelAlreadyExistException(s"New label $newLabelName already exists.")
+    val old = Label.findByName(oldLabelName, useCache = false).getOrElse(throw new LabelNotExistException(s"Old label $oldLabelName not exists."))
+    if (Label.findByName(newLabelName, useCache = false).isDefined) throw new LabelAlreadyExistException(s"New label $newLabelName already exists.")
 
     val allProps = old.metas.map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) }
     val allIndices = old.indices.map { index => Index(index.name, index.propNames) }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala b/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala
index 0ecbf4e..eaadb39 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala
@@ -49,8 +49,15 @@ object OrderingUtil {
         case (xv: Long, yv: Long) => implicitly[Ordering[Long]].compare(xv, yv)
         case (xv: Double, yv: Double) => implicitly[Ordering[Double]].compare(xv, yv)
         case (xv: String, yv: String) => implicitly[Ordering[String]].compare(xv, yv)
+        case (xv: BigDecimal, yv: BigDecimal) => implicitly[Ordering[BigDecimal]].compare(xv, yv)
         case (xv: JsValue, yv: JsValue) => implicitly[Ordering[JsValue]].compare(xv, yv)
         case (xv: InnerValLike, yv: InnerValLike) => implicitly[Ordering[InnerValLike]].compare(xv, yv)
+        case (xv: BigDecimal, yv: Long) => implicitly[Ordering[BigDecimal]].compare(xv, BigDecimal(yv))
+        case (xv: Long, yv: BigDecimal) => implicitly[Ordering[BigDecimal]].compare(BigDecimal(xv), yv)
+        case (xv: BigDecimal, yv: Int) => implicitly[Ordering[BigDecimal]].compare(xv, BigDecimal(yv))
+        case (xv: Int, yv: BigDecimal) => implicitly[Ordering[BigDecimal]].compare(BigDecimal(xv), yv)
+        case (xv: BigDecimal, yv: Double) => implicitly[Ordering[BigDecimal]].compare(xv, BigDecimal(yv))
+        case (xv: Double, yv: BigDecimal) => implicitly[Ordering[BigDecimal]].compare(BigDecimal(xv), yv)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
index 9ab08b3..7cc2420 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
@@ -20,11 +20,13 @@
 package org.apache.s2graph.core
 
 import org.apache.s2graph.core.GraphExceptions.BadQueryException
-import org.apache.s2graph.core.mysqls.{ColumnMeta, Label, LabelMeta, ServiceColumn}
+import org.apache.s2graph.core.mysqls._
 import org.apache.s2graph.core.types.{InnerVal, InnerValLike}
-import play.api.libs.json.{Json, _}
 import org.apache.s2graph.core.JSONParser._
-import scala.collection.mutable.ListBuffer
+import play.api.libs.json.{Json, _}
+
+import scala.collection.immutable
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 
 object PostProcess {
 
@@ -32,6 +34,7 @@ object PostProcess {
   type EDGE_VALUES = Map[String, JsValue]
   type ORDER_BY_VALUES =  (Any, Any, Any, Any)
   type RAW_EDGE = (EDGE_VALUES, Double, ORDER_BY_VALUES)
+  type GROUP_BY_KEY = Map[String, JsValue]
 
   /**
    * Result Entity score field name
@@ -47,523 +50,153 @@ object PostProcess {
   val SCORE_FIELD_NAME = "scoreSum"
   val reservedColumns = Set("cacheRemain", "from", "to", "label", "direction", "_timestamp", "timestamp", "score", "props")
 
-  def groupEdgeResult(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = {
-    val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap
-    //    filterNot {case (edge, score) => edge.props.contains(LabelMeta.degreeSeq)}
-    val groupedEdgesWithRank = (for {
-      queryRequestWithResult <- queryRequestWithResultLs
-      (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
-      edgeWithScore <- queryResult.edgeWithScoreLs
-      (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
-      if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, queryRequest.query.filterOutFields))
-    } yield {
-      (queryRequest.queryParam, edge, score)
-    }).groupBy {
-      case (queryParam, edge, rank) if edge.labelWithDir.dir == GraphUtil.directions("in") =>
-        (queryParam.label.srcColumn, queryParam.label.label, queryParam.label.tgtColumn, edge.tgtVertex.innerId, edge.isDegree)
-      case (queryParam, edge, rank) =>
-        (queryParam.label.tgtColumn, queryParam.label.label, queryParam.label.srcColumn, edge.tgtVertex.innerId, edge.isDegree)
-    }
 
-    val ret = for {
-      ((tgtColumn, labelName, srcColumn, target, isDegreeEdge), edgesAndRanks) <- groupedEdgesWithRank if !isDegreeEdge
-      edgesWithRanks = edgesAndRanks.groupBy(x => x._2.srcVertex).map(_._2.head)
-      id <- innerValToJsValue(target, tgtColumn.columnType)
-    } yield {
-      Json.obj("name" -> tgtColumn.columnName, "id" -> id,
-        SCORE_FIELD_NAME -> edgesWithRanks.map(_._3).sum,
-        "label" -> labelName,
-        "aggr" -> Json.obj(
-          "name" -> srcColumn.columnName,
-          "ids" -> edgesWithRanks.flatMap { case (queryParam, edge, rank) =>
-            innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType)
-          },
-          "edges" -> edgesWithRanks.map { case (queryParam, edge, rank) =>
-            Json.obj("id" -> innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType),
-              "props" -> propsToJson(edge),
-              "score" -> rank
-            )
-          }
-        )
-      )
-    }
-
-    ret.toList
-  }
-
-  def sortWithFormatted(jsons: Seq[JsObject],
-                        scoreField: String = "scoreSum",
-                        queryRequestWithResultLs: Seq[QueryRequestWithResult],
-                        decrease: Boolean = true): JsObject = {
-    val ordering = if (decrease) -1 else 1
-    val sortedJsons = jsons.sortBy { jsObject => (jsObject \ scoreField).as[Double] * ordering }
-
-    if (queryRequestWithResultLs.isEmpty) Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons)
-    else Json.obj(
-      "size" -> sortedJsons.size,
-      "results" -> sortedJsons,
-      "impressionId" -> queryRequestWithResultLs.head.queryRequest.query.impressionId()
-    )
-  }
-
-  private def toHashKey(edge: Edge, queryParam: QueryParam, fields: Seq[String], delimiter: String = ","): Int = {
-    val ls = for {
-      field <- fields
-    } yield {
-      field match {
-        case "from" | "_from" => edge.srcVertex.innerId.toIdString()
-        case "to" | "_to" => edge.tgtVertex.innerId.toIdString()
-        case "label" => edge.labelWithDir.labelId
-        case "direction" => JsString(GraphUtil.fromDirection(edge.labelWithDir.dir))
-        case "_timestamp" | "timestamp" => edge.ts
-        case _ =>
-          queryParam.label.metaPropsInvMap.get(field) match {
-            case None => throw new RuntimeException(s"unknow column: $field")
-            case Some(labelMeta) => edge.propsWithTs.get(labelMeta.seq) match {
-              case None => labelMeta.defaultValue
-              case Some(propVal) => propVal
-            }
-          }
-      }
+  def s2EdgeParent(graph: Graph,
+                   parentEdges: Seq[EdgeWithScore]): JsValue = {
+    if (parentEdges.isEmpty) JsNull
+    else {
+      val ancestors = for {
+        current <- parentEdges
+        parents = s2EdgeParent(graph, current.edge.parentEdges) if parents != JsNull
+      } yield {
+          val s2Edge = current.edge.originalEdgeOpt.getOrElse(current.edge)
+          s2EdgeToJsValue(s2Edge, current.score, false, parents = parents)
+        }
+      Json.toJson(ancestors)
     }
-    val ret = ls.hashCode()
-    ret
-  }
-
-  def resultInnerIds(queryRequestWithResultLs: Seq[QueryRequestWithResult], isSrcVertex: Boolean = false): Seq[Int] = {
-    for {
-      queryRequestWithResult <- queryRequestWithResultLs
-      (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
-      q = queryRequest.query
-      edgeWithScore <- queryResult.edgeWithScoreLs
-      (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
-    } yield  toHashKey(edge, queryRequest.queryParam, q.filterOutFields)
-  }
-
-  def summarizeWithListExcludeFormatted(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = {
-    val jsons = groupEdgeResult(queryRequestWithResultLs, exclude)
-    sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs, decrease = true)
   }
 
-  def summarizeWithList(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = {
-    val jsons = groupEdgeResult(queryRequestWithResultLs, exclude)
-    sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs)
-  }
-
-  def summarizeWithListFormatted(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = {
-    val jsons = groupEdgeResult(queryRequestWithResultLs, exclude)
-    sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs)
-  }
-
-  def toSimpleVertexArrJson(queryRequestWithResultLs: Seq[QueryRequestWithResult]): JsValue = {
-    toSimpleVertexArrJson(queryRequestWithResultLs, Seq.empty[QueryRequestWithResult])
-  }
-
-  private def orderBy(queryOption: QueryOption,
-                      rawEdges: ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)]): ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)] = {
-    import OrderingUtil._
-
-    if (queryOption.withScore && queryOption.orderByColumns.nonEmpty) {
-      val ascendingLs = queryOption.orderByColumns.map(_._2)
-      rawEdges.sortBy(_._3)(TupleMultiOrdering[Any](ascendingLs))
+  def s2EdgeToJsValue(s2Edge: Edge,
+                      score: Double,
+                      isDegree: Boolean = false,
+                      parents: JsValue = JsNull): JsValue = {
+    if (isDegree) {
+      Json.obj(
+        "from" -> anyValToJsValue(s2Edge.srcId),
+        "label" -> s2Edge.labelName,
+        LabelMeta.degree.name -> anyValToJsValue(s2Edge.propsWithTs(LabelMeta.degreeSeq).innerVal.value)
+      )
     } else {
-      rawEdges
+      Json.obj("from" -> anyValToJsValue(s2Edge.srcId),
+        "to" -> anyValToJsValue(s2Edge.tgtId),
+        "label" -> s2Edge.labelName,
+        "score" -> score,
+        "props" -> JSONParser.propertiesToJson(s2Edge.properties),
+        "direction" -> s2Edge.direction,
+        "timestamp" -> anyValToJsValue(s2Edge.ts),
+        "parents" -> parents
+      )
     }
   }
 
-  private def getColumnValue(keyWithJs: Map[String, JsValue], score: Double, edge: Edge, column: String): Any = {
-    column match {
-      case "score" => score
-      case "timestamp" | "_timestamp" => edge.ts
-      case _ =>
-        keyWithJs.get(column) match {
-          case None => keyWithJs.get("props").map { js => (js \ column).as[JsValue] }.get
-          case Some(x) => x
-        }
+  def withImpressionId(queryOption: QueryOption,
+                        size: Int,
+                        degrees: Seq[JsValue],
+                        results: Seq[JsValue]): JsValue = {
+    queryOption.impIdOpt match {
+      case None => Json.obj(
+        "size" -> size,
+        "degrees" -> degrees,
+        "results" -> results
+      )
+      case Some(impId) =>
+        Json.obj(
+          "size" -> size,
+          "degrees" -> degrees,
+          "results" -> results,
+          Experiment.ImpressionKey -> impId
+        )
     }
   }
+  def s2VertexToJson(s2Vertex: Vertex): Option[JsValue] = {
+    val props = for {
+      (k, v) <- s2Vertex.properties
+      jsVal <- anyValToJsValue(v)
+    } yield k -> jsVal
 
-  private def buildReplaceJson(jsValue: JsValue)(mapper: JsValue => JsValue): JsValue = {
-    def traverse(js: JsValue): JsValue = js match {
-      case JsNull => mapper(JsNull)
-      case JsNumber(v) => mapper(js)
-      case JsString(v) => mapper(js)
-      case JsBoolean(v) => mapper(js)
-      case JsArray(elements) => JsArray(elements.map { t => traverse(mapper(t)) })
-      case JsObject(values) => JsObject(values.map { case (k, v) => k -> traverse(mapper(v)) })
+    for {
+      id <- anyValToJsValue(s2Vertex.innerIdVal)
+    } yield {
+      Json.obj(
+        "serviceName" -> s2Vertex.serviceName,
+        "columnName" -> s2Vertex.columnName,
+        "id" -> id,
+        "props" -> Json.toJson(props),
+        "timestamp" -> s2Vertex.ts
+      )
     }
-
-    traverse(jsValue)
   }
 
-  /** test query with filterOut is not working since it can not diffrentate filterOut */
-  private def buildNextQuery(jsonQuery: JsValue, _cursors: Seq[Seq[String]]): JsValue = {
-    val cursors = _cursors.flatten.iterator
+  def verticesToJson(s2Vertices: Seq[Vertex]): JsValue =
+    Json.toJson(s2Vertices.flatMap(s2VertexToJson(_)))
 
-    buildReplaceJson(jsonQuery) {
-      case js@JsObject(fields) =>
-        val isStep = fields.find { case (k, _) => k == "label" } // find label group
-        if (isStep.isEmpty) js
-        else {
-          // TODO: Order not ensured
-          val withCursor = js.fieldSet | Set("cursor" -> JsString(cursors.next))
-          JsObject(withCursor.toSeq)
-        }
-      case js => js
-    }
-  }
+  def withOptionalFields(queryOption: QueryOption,
+                         size: Int,
+                         degrees: Seq[JsValue],
+                         results: Seq[JsValue],
+                         failCount: Int = 0,
+                         cursors: => JsValue,
+                         nextQuery: => Option[JsValue]): JsValue = {
 
-  private def buildRawEdges(queryOption: QueryOption,
-                            queryRequestWithResultLs: Seq[QueryRequestWithResult],
-                            excludeIds: Map[Int, Boolean],
-                            scoreWeight: Double = 1.0): (ListBuffer[JsValue], ListBuffer[RAW_EDGE]) = {
-    val degrees = ListBuffer[JsValue]()
-    val rawEdges = ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)]()
-    val metaPropNamesMap = scala.collection.mutable.Map[String, Int]()
-    for {
-      queryRequestWithResult <- queryRequestWithResultLs
-    } yield {
-      val (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get
-      queryRequest.queryParam.label.metaPropNames.foreach { metaPropName =>
-        metaPropNamesMap.put(metaPropName, metaPropNamesMap.getOrElse(metaPropName, 0) + 1)
-      }
-    }
-    val propsExistInAll = metaPropNamesMap.filter(kv => kv._2 == queryRequestWithResultLs.length)
-    val orderByColumns = queryOption.orderByColumns.filter { case (column, _) =>
-      column match {
-        case "from" | "to" | "label" | "score" | "timestamp" | "_timestamp" => true
-        case _ =>
-          propsExistInAll.contains(column)
-//          //TODO??
-//          false
-//          queryParam.label.metaPropNames.contains(column)
-      }
-    }
+    val kvs = new ArrayBuffer[(String, JsValue)]()
 
-    /* build result jsons */
-    for {
-      queryRequestWithResult <- queryRequestWithResultLs
-      (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
-      queryParam = queryRequest.queryParam
-      edgeWithScore <- queryResult.edgeWithScoreLs
-      (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
-      hashKey = toHashKey(edge, queryRequest.queryParam, queryOption.filterOutFields)
-      if !excludeIds.contains(hashKey)
-    } {
+    kvs.append("size" -> JsNumber(size))
+    kvs.append("degrees" -> JsArray(degrees))
+    kvs.append("results" -> JsArray(results))
 
-      // edge to json
-      val (srcColumn, _) = queryParam.label.srcTgtColumn(edge.labelWithDir.dir)
-      val fromOpt = innerValToJsValue(edge.srcVertex.id.innerId, srcColumn.columnType)
-      if (edge.isDegree && fromOpt.isDefined) {
-        if (queryOption.limitOpt.isEmpty) {
-          degrees += Json.obj(
-            "from" -> fromOpt.get,
-            "label" -> queryRequest.queryParam.label.label,
-            "direction" -> GraphUtil.fromDirection(edge.labelWithDir.dir),
-            LabelMeta.degree.name -> innerValToJsValue(edge.propsWithTs(LabelMeta.degreeSeq).innerVal, InnerVal.LONG)
-          )
-        }
-      } else {
-        val keyWithJs = edgeToJson(edge, score, queryRequest.query, queryRequest.queryParam)
-        val orderByValues: (Any, Any, Any, Any) = orderByColumns.length match {
-          case 0 =>
-            (None, None, None, None)
-          case 1 =>
-            val it = orderByColumns.iterator
-            val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            (v1, None, None, None)
-          case 2 =>
-            val it = orderByColumns.iterator
-            val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            (v1, v2, None, None)
-          case 3 =>
-            val it = orderByColumns.iterator
-            val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            val v3 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            (v1, v2, v3, None)
-          case _ =>
-            val it = orderByColumns.iterator
-            val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            val v3 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            val v4 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            (v1, v2, v3, v4)
-        }
+    if (queryOption.impIdOpt.isDefined) kvs.append(Experiment.ImpressionKey -> JsString(queryOption.impIdOpt.get))
 
-        val currentEdge = (keyWithJs, score * scoreWeight, orderByValues)
-        rawEdges += currentEdge
-      }
-    }
-    (degrees, rawEdges)
+    JsObject(kvs)
   }
 
-  private def buildResultJsValue(queryOption: QueryOption,
-                                 degrees: ListBuffer[JsValue],
-                                 rawEdges: ListBuffer[RAW_EDGE]): JsValue = {
-    if (queryOption.groupByColumns.isEmpty) {
-      // ordering
-      val filteredEdges = rawEdges.filter(t => t._2 >= queryOption.scoreThreshold)
+  def toJson(graph: Graph,
+             queryOption: QueryOption,
+             stepResult: StepResult): JsValue = {
 
-      val edges = queryOption.limitOpt match {
-        case None => orderBy(queryOption, filteredEdges).map(_._1)
-        case Some(limit) => orderBy(queryOption, filteredEdges).map(_._1).take(limit)
-      }
-      val resultDegrees = if (queryOption.returnDegree) degrees else emptyDegrees
 
-      Json.obj(
-        "size" -> edges.size,
-        "degrees" -> resultDegrees,
-//        "queryNext" -> buildNextQuery(q.jsonQuery, q.cursorStrings()),
-        "results" -> edges
-      )
-    } else {
-      val grouped = rawEdges.groupBy { case (keyWithJs, _, _) =>
-        val props = keyWithJs.get("props")
+    val degrees =
+      if (queryOption.returnDegree) stepResult.degreeEdges.map(t => s2EdgeToJsValue(t.s2Edge, t.score, true))
+      else emptyDegrees
 
-        for {
-          column <- queryOption.groupByColumns
-          value <- keyWithJs.get(column) match {
-            case None => props.flatMap { js => (js \ column).asOpt[JsValue] }
-            case Some(x) => Some(x)
-          }
-        } yield column -> value
+    if (queryOption.groupBy.keys.isEmpty) {
+      // no group by specified on query.
+
+      val ls = stepResult.results.map { t =>
+        val parents = if (queryOption.returnTree) s2EdgeParent(graph, t.parentEdges) else JsNull
+        s2EdgeToJsValue(t.s2Edge, t.score, false, parents)
       }
+      withImpressionId(queryOption, ls.size, degrees, ls)
+    } else {
 
-      val groupedEdgesWithScoreSum =
+      val results =
         for {
-          (groupByKeyVals, groupedRawEdges) <- grouped
-          scoreSum = groupedRawEdges.map(x => x._2).sum if scoreSum >= queryOption.scoreThreshold
+          (groupByValues, (scoreSum, edges)) <- stepResult.grouped
         } yield {
-          // ordering
-          val edges = orderBy(queryOption, groupedRawEdges).map(_._1)
+          val groupByKeyValues = queryOption.groupBy.keys.zip(groupByValues).map { case (k, valueOpt) =>
+            k -> valueOpt.flatMap(anyValToJsValue).getOrElse(JsNull)
+          }
+          val groupByValuesJson = Json.toJson(groupByKeyValues.toMap)
 
-          //TODO: refactor this
-          val js = if (queryOption.returnAgg)
+          if (!queryOption.returnAgg) {
             Json.obj(
-              "groupBy" -> Json.toJson(groupByKeyVals.toMap),
+              "groupBy" -> groupByValuesJson,
               "scoreSum" -> scoreSum,
-              "agg" -> edges
+              "agg" -> Json.arr()
             )
-          else
+          } else {
+            val agg = edges.map { t =>
+              val parents = if (queryOption.returnTree) s2EdgeParent(graph, t.parentEdges) else JsNull
+              s2EdgeToJsValue(t.s2Edge, t.score, false, parents)
+            }
+            val aggJson = Json.toJson(agg)
             Json.obj(
-              "groupBy" -> Json.toJson(groupByKeyVals.toMap),
+              "groupBy" -> groupByValuesJson,
               "scoreSum" -> scoreSum,
-              "agg" -> Json.arr()
+              "agg" -> aggJson
             )
-          (js, scoreSum)
-        }
-
-      val groupedSortedJsons = queryOption.limitOpt match {
-        case None =>
-          groupedEdgesWithScoreSum.toList.sortBy { case (jsVal, scoreSum) => scoreSum * -1 }.map(_._1)
-        case Some(limit) =>
-          groupedEdgesWithScoreSum.toList.sortBy { case (jsVal, scoreSum) => scoreSum * -1 }.map(_._1).take(limit)
-      }
-      val resultDegrees = if (queryOption.returnDegree) degrees else emptyDegrees
-      Json.obj(
-        "size" -> groupedSortedJsons.size,
-        "degrees" -> resultDegrees,
-//        "queryNext" -> buildNextQuery(q.jsonQuery, q.cursorStrings()),
-        "results" -> groupedSortedJsons
-      )
-    }
-  }
-
-  def toSimpleVertexArrJsonMulti(queryOption: QueryOption,
-                                 resultWithExcludeLs: Seq[(Seq[QueryRequestWithResult], Seq[QueryRequestWithResult])],
-                                 excludes: Seq[QueryRequestWithResult]): JsValue = {
-    val excludeIds = (Seq((Seq.empty, excludes)) ++ resultWithExcludeLs).foldLeft(Map.empty[Int, Boolean]) { case (acc, (result, excludes)) =>
-      acc ++ resultInnerIds(excludes).map(hashKey => hashKey -> true).toMap
-    }
-
-    val (degrees, rawEdges) = (ListBuffer.empty[JsValue], ListBuffer.empty[RAW_EDGE])
-    for {
-      (result, localExclude) <- resultWithExcludeLs
-    } {
-      val newResult = result.map { queryRequestWithResult =>
-        val (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get
-        val newQuery = queryRequest.query.copy(queryOption = queryOption)
-        queryRequestWithResult.copy(queryRequest = queryRequest.copy(query = newQuery))
-      }
-      val (_degrees, _rawEdges) = buildRawEdges(queryOption, newResult, excludeIds)
-      degrees ++= _degrees
-      rawEdges ++= _rawEdges
-    }
-    buildResultJsValue(queryOption, degrees, rawEdges)
-  }
-
-  def toSimpleVertexArrJson(queryOption: QueryOption,
-                            queryRequestWithResultLs: Seq[QueryRequestWithResult],
-                            exclude: Seq[QueryRequestWithResult]): JsValue = {
-    val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap
-    val (degrees, rawEdges) = buildRawEdges(queryOption, queryRequestWithResultLs, excludeIds)
-    buildResultJsValue(queryOption, degrees, rawEdges)
-  }
-
-
-  def toSimpleVertexArrJson(queryRequestWithResultLs: Seq[QueryRequestWithResult],
-                            exclude: Seq[QueryRequestWithResult]): JsValue = {
-
-    queryRequestWithResultLs.headOption.map { queryRequestWithResult =>
-      val (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get
-      val query = queryRequest.query
-      val queryOption = query.queryOption
-      val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap
-      val (degrees, rawEdges) = buildRawEdges(queryOption, queryRequestWithResultLs, excludeIds)
-      buildResultJsValue(queryOption, degrees, rawEdges)
-    } getOrElse emptyResults
-  }
-
-  def verticesToJson(vertices: Iterable[Vertex]) = {
-    Json.toJson(vertices.flatMap { v => vertexToJson(v) })
-  }
-
-  def propsToJson(edge: Edge, q: Query, queryParam: QueryParam): Map[String, JsValue] = {
-    for {
-      (seq, innerValWithTs) <- edge.propsWithTs if LabelMeta.isValidSeq(seq)
-      labelMeta <- queryParam.label.metaPropsMap.get(seq)
-      jsValue <- innerValToJsValue(innerValWithTs.innerVal, labelMeta.dataType)
-    } yield labelMeta.name -> jsValue
-  }
-
-  private def edgeParent(parentEdges: Seq[EdgeWithScore], q: Query, queryParam: QueryParam): JsValue = {
-    if (parentEdges.isEmpty) {
-      JsNull
-    } else {
-      val parents = for {
-        parent <- parentEdges
-        (parentEdge, parentScore) = EdgeWithScore.unapply(parent).get
-        parentQueryParam = QueryParam(parentEdge.labelWithDir)
-        parents = edgeParent(parentEdge.parentEdges, q, parentQueryParam) if parents != JsNull
-      } yield {
-        val originalEdge = parentEdge.originalEdgeOpt.getOrElse(parentEdge)
-        val edgeJson = edgeToJsonInner(originalEdge, parentScore, q, parentQueryParam) + ("parents" -> parents)
-        Json.toJson(edgeJson)
-      }
-
-      Json.toJson(parents)
-    }
-  }
-
-  /** TODO */
-  def edgeToJsonInner(edge: Edge, score: Double, q: Query, queryParam: QueryParam): Map[String, JsValue] = {
-    val (srcColumn, tgtColumn) = queryParam.label.srcTgtColumn(edge.labelWithDir.dir)
-
-    val kvMapOpt = for {
-      from <- innerValToJsValue(edge.srcVertex.id.innerId, srcColumn.columnType)
-      to <- innerValToJsValue(edge.tgtVertex.id.innerId, tgtColumn.columnType)
-    } yield {
-      val targetColumns = if (q.selectColumnsSet.isEmpty) reservedColumns else (reservedColumns & q.selectColumnsSet) + "props"
-      val _propsMap = queryParam.label.metaPropsDefaultMapInner ++ propsToJson(edge, q, queryParam)
-      val propsMap = if (q.selectColumnsSet.nonEmpty) _propsMap.filterKeys(q.selectColumnsSet) else _propsMap
-
-      val kvMap = targetColumns.foldLeft(Map.empty[String, JsValue]) { (map, column) =>
-        val jsValue = column match {
-          case "cacheRemain" => JsNumber(queryParam.cacheTTLInMillis - (System.currentTimeMillis() - queryParam.timestamp))
-          case "from" => from
-          case "to" => to
-          case "label" => JsString(queryParam.label.label)
-          case "direction" => JsString(GraphUtil.fromDirection(edge.labelWithDir.dir))
-          case "_timestamp" | "timestamp" => JsNumber(edge.ts)
-          case "score" => JsNumber(score)
-          case "props" if propsMap.nonEmpty => Json.toJson(propsMap)
-          case _ => JsNull
+          }
         }
-
-        if (jsValue == JsNull) map else map + (column -> jsValue)
-      }
-      kvMap
-    }
-
-    kvMapOpt.getOrElse(Map.empty)
-  }
-
-  def edgeToJson(edge: Edge, score: Double, q: Query, queryParam: QueryParam): Map[String, JsValue] = {
-    val kvs = edgeToJsonInner(edge, score, q, queryParam)
-    if (kvs.nonEmpty && q.returnTree) kvs + ("parents" -> Json.toJson(edgeParent(edge.parentEdges, q, queryParam)))
-    else kvs
-  }
-
-  def vertexToJson(vertex: Vertex): Option[JsObject] = {
-    val serviceColumn = ServiceColumn.findById(vertex.id.colId)
-
-    for {
-      id <- innerValToJsValue(vertex.innerId, serviceColumn.columnType)
-    } yield {
-      Json.obj("serviceName" -> serviceColumn.service.serviceName,
-        "columnName" -> serviceColumn.columnName,
-        "id" -> id, "props" -> propsToJson(vertex),
-        "timestamp" -> vertex.ts,
-        //        "belongsTo" -> vertex.belongLabelIds)
-        "belongsTo" -> vertex.belongLabelIds.flatMap(Label.findByIdOpt(_).map(_.label)))
+      withImpressionId(queryOption, results.size, degrees, results)
     }
   }
-
-  private def keysToName(seqsToNames: Map[Int, String], props: Map[Int, InnerValLike]) = {
-    for {
-      (seq, value) <- props
-      name <- seqsToNames.get(seq)
-    } yield (name, value)
-  }
-
-  private def propsToJson(vertex: Vertex) = {
-    val serviceColumn = vertex.serviceColumn
-    val props = for {
-      (propKey, innerVal) <- vertex.props
-      columnMeta <- ColumnMeta.findByIdAndSeq(serviceColumn.id.get, propKey.toByte, useCache = true)
-      jsValue <- innerValToJsValue(innerVal, columnMeta.dataType)
-    } yield {
-      (columnMeta.name -> jsValue)
-    }
-    props.toMap
-  }
-
-  def propsToJson(edge: Edge) = {
-    for {
-      (seq, v) <- edge.propsWithTs if LabelMeta.isValidSeq(seq)
-      metaProp <- edge.label.metaPropsMap.get(seq)
-      jsValue <- innerValToJsValue(v.innerVal, metaProp.dataType)
-    } yield {
-      (metaProp.name, jsValue)
-    }
-  }
-
-  def summarizeWithListExclude(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]): JsObject = {
-    val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap
-
-
-    val groupedEdgesWithRank = (for {
-      queryRequestWithResult <- queryRequestWithResultLs
-      (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
-      edgeWithScore <- queryResult.edgeWithScoreLs
-      (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
-      if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, queryRequest.query.filterOutFields))
-    } yield {
-      (edge, score)
-    }).groupBy { case (edge, score) =>
-      (edge.label.tgtColumn, edge.label.srcColumn, edge.tgtVertex.innerId)
-    }
-
-    val jsons = for {
-      ((tgtColumn, srcColumn, target), edgesAndRanks) <- groupedEdgesWithRank
-      (edges, ranks) = edgesAndRanks.groupBy(x => x._1.srcVertex).map(_._2.head).unzip
-      tgtId <- innerValToJsValue(target, tgtColumn.columnType)
-    } yield {
-      Json.obj(tgtColumn.columnName -> tgtId,
-        s"${srcColumn.columnName}s" ->
-          edges.flatMap(edge => innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType)), "scoreSum" -> ranks.sum)
-    }
-    val sortedJsons = jsons.toList.sortBy { jsObj => (jsObj \ "scoreSum").as[Double] }.reverse
-    if (queryRequestWithResultLs.isEmpty) {
-      Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons)
-    } else {
-      Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons,
-        "impressionId" -> queryRequestWithResultLs.head.queryRequest.query.impressionId())
-    }
-
-  }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
index 28d78a9..ef40688 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -21,6 +21,7 @@ package org.apache.s2graph.core
 
 import com.google.common.hash.Hashing
 import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
 import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
 import org.apache.s2graph.core.parsers.{Where, WhereParser}
 import org.apache.s2graph.core.types.{HBaseSerializable, InnerVal, InnerValLike, LabelWithDirection}
@@ -51,6 +52,12 @@ object Query {
   }
 }
 
+object GroupBy {
+  val Empty = GroupBy()
+}
+case class GroupBy(keys: Seq[String] = Nil,
+                   limit: Int = Int.MaxValue)
+
 case class MultiQuery(queries: Seq[Query],
                       weights: Seq[Double],
                       queryOption: QueryOption,
@@ -58,7 +65,7 @@ case class MultiQuery(queries: Seq[Query],
 
 case class QueryOption(removeCycle: Boolean = false,
                        selectColumns: Seq[String] = Seq.empty,
-                       groupByColumns: Seq[String] = Seq.empty,
+                       groupBy: GroupBy = GroupBy.Empty,
                        orderByColumns: Seq[(String, Boolean)] = Seq.empty,
                        filterOutQuery: Option[Query] = None,
                        filterOutFields: Seq[String] = Seq(LabelMeta.to.name),
@@ -67,8 +74,11 @@ case class QueryOption(removeCycle: Boolean = false,
                        limitOpt: Option[Int] = None,
                        returnAgg: Boolean = true,
                        scoreThreshold: Double = Double.MinValue,
-                       returnDegree: Boolean = true)
-
+                       returnDegree: Boolean = true,
+                       impIdOpt: Option[String] = None) {
+  val orderByKeys = orderByColumns.map(_._1)
+  val ascendingVals = orderByColumns.map(_._2)
+}
 
 case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex],
                  steps: IndexedSeq[Step] = Vector.empty[Step],
@@ -77,8 +87,7 @@ case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex],
 
   val removeCycle = queryOption.removeCycle
   val selectColumns = queryOption.selectColumns
-//  val groupBy = queryOption.groupBy
-  val groupByColumns = queryOption.groupByColumns
+  val groupBy = queryOption.groupBy
   val orderByColumns = queryOption.orderByColumns
   val filterOutQuery = queryOption.filterOutQuery
   val filterOutFields = queryOption.filterOutFields
@@ -90,7 +99,7 @@ case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex],
 
   def cacheKeyBytes: Array[Byte] = {
     val selectBytes = Bytes.toBytes(queryOption.selectColumns.toString)
-    val groupBytes = Bytes.toBytes(queryOption.groupByColumns.toString)
+    val groupBytes = Bytes.toBytes(queryOption.groupBy.keys.toString)
     val orderByBytes = Bytes.toBytes(queryOption.orderByColumns.toString)
     val filterOutBytes = queryOption.filterOutQuery.map(_.cacheKeyBytes).getOrElse(Array.empty[Byte])
     val returnTreeBytes = Bytes.toBytes(queryOption.returnTree)
@@ -279,11 +288,23 @@ case class RankParam(labelId: Int, var keySeqAndWeights: Seq[(Byte, Double)] = S
     bytes
   }
 }
+case class S2Request(labelName: String,
+                     direction: String = "out",
+                     ts: Long = System.currentTimeMillis(),
+                     options: Map[String, Any] = Map.empty) {
+  val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
+  val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
+  val labelWithDir = LabelWithDirection(label.id.get, dir)
+  //TODO: need to merge options into queryParam.
+  val queryParam = QueryParam(labelWithDir, ts)
 
+}
 object QueryParam {
   lazy val Empty = QueryParam(LabelWithDirection(0, 0))
   lazy val DefaultThreshold = Double.MinValue
   val Delimiter = ","
+  val maxMetaByte = (-1).toByte
+  val fillArray = Array.fill(100)(maxMetaByte)
 }
 
 case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System.currentTimeMillis()) {
@@ -381,13 +402,13 @@ case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System
   }
 
   def limit(offset: Int, limit: Int): QueryParam = {
-    /* since degree info is located on first always */
-    if (offset == 0 && this.columnRangeFilter == null) {
-      this.limit = limit + 1
-      this.offset = offset
-    } else {
-      this.limit = limit
-      this.offset = offset + 1
+    /** since degree info is located on first always */
+    this.limit = limit
+    this.offset = offset
+
+    if (this.columnRangeFilter == null) {
+      if (offset == 0) this.limit = limit + 1
+      else this.offset = offset + 1
     }
     //    this.columnPaginationFilter = new ColumnPaginationFilter(this.limit, this.offset)
     this
@@ -400,26 +421,24 @@ case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System
     }
   }
 
-  def interval(from: Seq[(Byte, InnerValLike)], to: Seq[(Byte, InnerValLike)]): QueryParam = {
-    //    val len = label.orderTypes.size.toByte
-    //    val len = label.extraIndicesMap(labelOrderSeq).sortKeyTypes.size.toByte
-    //    logger.error(s"indicesMap: ${label.indicesMap(labelOrderSeq)}")
-    val len = label.indicesMap(labelOrderSeq).sortKeyTypes.size.toByte
+  def paddingInterval(len: Byte, from: Seq[(Byte, InnerValLike)], to: Seq[(Byte, InnerValLike)]) = {
+    val fromVal = Bytes.add(propsToBytes(from), QueryParam.fillArray)
+    val toVal = propsToBytes(to)
 
-    val minMetaByte = InnerVal.minMetaByte
-    //    val maxMetaByte = InnerVal.maxMetaByte
-    val maxMetaByte = -1.toByte
-    val toVal = Bytes.add(propsToBytes(to), Array.fill(1)(minMetaByte))
-    //FIXME
-    val fromVal = Bytes.add(propsToBytes(from), Array.fill(10)(maxMetaByte))
     toVal(0) = len
     fromVal(0) = len
-    val maxBytes = fromVal
-    val minBytes = toVal
+
+    val minMax = (toVal, fromVal) // inverted
+    minMax
+  }
+
+  def interval(from: Seq[(Byte, InnerValLike)], to: Seq[(Byte, InnerValLike)]): QueryParam = {
+    val len = label.indicesMap(labelOrderSeq).sortKeyTypes.size.toByte
+    val (minBytes, maxBytes) = paddingInterval(len, from, to)
+
     this.columnRangeFilterMaxBytes = maxBytes
     this.columnRangeFilterMinBytes = minBytes
-    val rangeFilter = new ColumnRangeFilter(minBytes, true, maxBytes, true)
-    this.columnRangeFilter = rangeFilter
+    this.columnRangeFilter = new ColumnRangeFilter(minBytes, true, maxBytes, true)
     this
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
index 5343659..5b2622f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
@@ -21,31 +21,42 @@ package org.apache.s2graph.core
 
 import org.apache.s2graph.core.mysqls.LabelMeta
 import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs}
+import org.apache.s2graph.core.utils.logger
 
-import scala.collection.Seq
+import scala.collection.mutable.ListBuffer
+import scala.collection.{Seq, mutable}
 
 object QueryResult {
-  def fromVertices(query: Query): Seq[QueryRequestWithResult] = {
+  def fromVertices(query: Query): StepInnerResult = {
     if (query.steps.isEmpty || query.steps.head.queryParams.isEmpty) {
-      Seq.empty
+      StepInnerResult.Empty
     } else {
       val queryParam = query.steps.head.queryParams.head
       val label = queryParam.label
       val currentTs = System.currentTimeMillis()
       val propsWithTs = Map(LabelMeta.timeStampSeq ->
         InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), currentTs))
-      for {
+      val edgeWithScoreLs = for {
         vertex <- query.vertices
       } yield {
-        val edge = Edge(vertex, vertex, queryParam.labelWithDir, propsWithTs = propsWithTs)
-        val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore)
-        QueryRequestWithResult(QueryRequest(query, -1, vertex, queryParam),
-          QueryResult(edgeWithScoreLs = Seq(edgeWithScore)))
-      }
+          val edge = Edge(vertex, vertex, queryParam.labelWithDir, propsWithTs = propsWithTs)
+          val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore)
+          edgeWithScore
+        }
+      StepInnerResult(edgesWithScoreLs = edgeWithScoreLs, Nil, false)
     }
   }
 }
-case class QueryRequestWithResult(queryRequest: QueryRequest, queryResult: QueryResult)
+/** inner traverse */
+object StepInnerResult {
+  val Failure = StepInnerResult(Nil, Nil, true)
+  val Empty = StepInnerResult(Nil, Nil, false)
+}
+case class StepInnerResult(edgesWithScoreLs: Seq[EdgeWithScore],
+                           degreeEdges: Seq[EdgeWithScore],
+                           isFailure: Boolean = false) {
+  val isEmpty = edgesWithScoreLs.isEmpty
+}
 
 case class QueryRequest(query: Query,
                         stepIdx: Int,
@@ -59,3 +70,206 @@ case class QueryResult(edgeWithScoreLs: Seq[EdgeWithScore] = Nil,
                        isFailure: Boolean = false)
 
 case class EdgeWithScore(edge: Edge, score: Double)
+
+
+
+
+
+/** result */
+
+object StepResult {
+
+  type Values = Seq[S2EdgeWithScore]
+  type GroupByKey = Seq[Option[Any]]
+  val EmptyOrderByValues = (None, None, None, None)
+  val Empty = StepResult(Nil, Nil, Nil)
+
+
+  def mergeOrdered(left: StepResult.Values,
+                   right: StepResult.Values,
+                   queryOption: QueryOption): (Double, StepResult.Values) = {
+    val merged = (left ++ right)
+    val scoreSum = merged.foldLeft(0.0) { case (prev, current) => prev + current.score }
+    if (scoreSum < queryOption.scoreThreshold) (0.0, Nil)
+    else {
+      val ordered = orderBy(queryOption, merged)
+      val filtered = ordered.take(queryOption.groupBy.limit)
+      val newScoreSum = filtered.foldLeft(0.0) { case (prev, current) => prev + current.score }
+      (newScoreSum, filtered)
+    }
+  }
+
+  def orderBy(queryOption: QueryOption, notOrdered: Values): Values = {
+    import OrderingUtil._
+
+    if (queryOption.withScore && queryOption.orderByColumns.nonEmpty) {
+      notOrdered.sortBy(_.orderByValues)(TupleMultiOrdering[Any](queryOption.ascendingVals))
+    } else {
+      notOrdered
+    }
+  }
+  def toOrderByValues(s2Edge: Edge,
+                      score: Double,
+                      orderByKeys: Seq[String]): (Any, Any, Any, Any) = {
+    def toValue(propertyKey: String): Any = {
+      propertyKey match {
+        case "score" => score
+        case "timestamp" | "_timestamp" => s2Edge.ts
+        case _ => s2Edge.properties.get(propertyKey)
+      }
+    }
+    if (orderByKeys.isEmpty) (None, None, None, None)
+    else {
+      orderByKeys.length match {
+        case 1 =>
+          (toValue(orderByKeys(0)), None, None, None)
+        case 2 =>
+          (toValue(orderByKeys(0)), toValue(orderByKeys(1)), None, None)
+        case 3 =>
+          (toValue(orderByKeys(0)), toValue(orderByKeys(1)), toValue(orderByKeys(2)), None)
+        case _ =>
+          (toValue(orderByKeys(0)), toValue(orderByKeys(1)), toValue(orderByKeys(2)), toValue(orderByKeys(3)))
+      }
+    }
+  }
+  /**
+   * merge multiple StepResult into one StepResult.
+   * @param queryOption
+   * @param multiStepResults
+   * @return
+   */
+  def merges(queryOption: QueryOption,
+             multiStepResults: Seq[StepResult],
+             weights: Seq[Double] = Nil): StepResult = {
+    val degrees = multiStepResults.flatMap(_.degreeEdges)
+    val ls = new mutable.ListBuffer[S2EdgeWithScore]()
+    val agg= new mutable.HashMap[GroupByKey, ListBuffer[S2EdgeWithScore]]()
+    val sums = new mutable.HashMap[GroupByKey, Double]()
+
+    for {
+      (weight, eachStepResult) <- weights.zip(multiStepResults)
+      (ordered, grouped) = (eachStepResult.results, eachStepResult.grouped)
+    } {
+      ordered.foreach { t =>
+        val newScore = t.score * weight
+        ls += t.copy(score = newScore)
+      }
+
+      // process each query's stepResult's grouped
+      for {
+        (groupByKey, (scoreSum, values)) <- grouped
+      } {
+        val buffer = agg.getOrElseUpdate(groupByKey, ListBuffer.empty[S2EdgeWithScore])
+        var scoreSum = 0.0
+        values.foreach { t =>
+          val newScore = t.score * weight
+          buffer += t.copy(score = newScore)
+          scoreSum += newScore
+        }
+        sums += (groupByKey -> scoreSum)
+      }
+    }
+
+    // process global groupBy
+    if (queryOption.groupBy.keys.nonEmpty) {
+      for {
+        s2EdgeWithScore <- ls
+        groupByKey = s2EdgeWithScore.s2Edge.selectValues(queryOption.groupBy.keys)
+      } {
+        val buffer = agg.getOrElseUpdate(groupByKey, ListBuffer.empty[S2EdgeWithScore])
+        buffer += s2EdgeWithScore
+        val newScore = sums.getOrElse(groupByKey, 0.0) + s2EdgeWithScore.score
+        sums += (groupByKey -> newScore)
+      }
+    }
+
+
+    val ordered = orderBy(queryOption, ls)
+    val grouped = for {
+      (groupByKey, scoreSum) <- sums.toSeq.sortBy(_._2 * -1)
+      aggregated = agg(groupByKey) if aggregated.nonEmpty
+    } yield groupByKey -> (scoreSum, aggregated)
+
+    StepResult(results = ordered, grouped = grouped, degrees)
+  }
+
+  //TODO: Optimize this.
+  def filterOut(graph: Graph,
+                queryOption: QueryOption,
+                baseStepResult: StepResult,
+                filterOutStepInnerResult: StepInnerResult): StepResult = {
+
+    val fields = if (queryOption.filterOutFields.isEmpty) Seq("to") else Seq("to")
+    //    else queryOption.filterOutFields
+    val filterOutSet = filterOutStepInnerResult.edgesWithScoreLs.map { t =>
+      t.edge.selectValues(fields)
+    }.toSet
+
+    val filteredResults = baseStepResult.results.filter { t =>
+      val filterOutKey = t.s2Edge.selectValues(fields)
+      !filterOutSet.contains(filterOutKey)
+    }
+
+    val grouped = for {
+      (key, (scoreSum, values)) <- baseStepResult.grouped
+      (out, in) = values.partition(v => filterOutSet.contains(v.s2Edge.selectValues(fields)))
+      newScoreSum = scoreSum - out.foldLeft(0.0) { case (prev, current) => prev + current.score } if in.nonEmpty
+    } yield key -> (newScoreSum, in)
+
+
+    StepResult(results = filteredResults, grouped = grouped, baseStepResult.degreeEdges)
+  }
+  def apply(graph: Graph,
+            queryOption: QueryOption,
+            stepInnerResult: StepInnerResult): StepResult = {
+        logger.debug(s"[BeforePostProcess]: ${stepInnerResult.edgesWithScoreLs.size}")
+
+    val results = for {
+      edgeWithScore <- stepInnerResult.edgesWithScoreLs
+    } yield {
+        val edge = edgeWithScore.edge
+        val orderByValues =
+          if (queryOption.orderByColumns.isEmpty) (edgeWithScore.score, None, None, None)
+          else toOrderByValues(edge, edgeWithScore.score, queryOption.orderByKeys)
+
+        S2EdgeWithScore(edge, edgeWithScore.score, orderByValues, edgeWithScore.edge.parentEdges)
+      }
+    /** ordered flatten result */
+    val ordered = orderBy(queryOption, results)
+
+    /** ordered grouped result */
+    val grouped =
+      if (queryOption.groupBy.keys.isEmpty) Nil
+      else {
+        val agg = new mutable.HashMap[GroupByKey, (Double, Values)]()
+        results.groupBy { s2EdgeWithScore =>
+          s2EdgeWithScore.s2Edge.selectValues(queryOption.groupBy.keys, useToString = true)
+        }.map { case (k, ls) =>
+          val (scoreSum, merged) = mergeOrdered(ls, Nil, queryOption)
+          /**
+           * watch out here. by calling toString on Any, we lose type information which will be used
+           * later for toJson.
+           */
+          if (merged.nonEmpty) {
+            val newKey = merged.head.s2Edge.selectValues(queryOption.groupBy.keys, useToString = false)
+            agg += (newKey -> (scoreSum, merged))
+          }
+        }
+        agg.toSeq.sortBy(_._2._1 * -1)
+      }
+
+    val degrees = stepInnerResult.degreeEdges.map(t => S2EdgeWithScore(t.edge, t.score))
+    StepResult(results = ordered, grouped = grouped, degreeEdges = degrees)
+  }
+}
+
+case class S2EdgeWithScore(s2Edge: Edge,
+                           score: Double,
+                           orderByValues: (Any, Any, Any, Any) = StepResult.EmptyOrderByValues,
+                           parentEdges: Seq[EdgeWithScore] = Nil)
+
+case class StepResult(results: StepResult.Values,
+                      grouped: Seq[(StepResult.GroupByKey, (Double, StepResult.Values))],
+                      degreeEdges: StepResult.Values) {
+  val isEmpty = results.isEmpty
+}
\ No newline at end of file