You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/11/16 16:01:24 UTC

[3/3] incubator-s2graph git commit: [S2GRAPH-121]: Create `Result` class to hold traverse result edges.

[S2GRAPH-121]: Create `Result` class to hold traverse result edges.

JIRA:
  [S2GRAPH-121] https://issues.apache.org/jira/browse/S2GRAPH-121

Pull Request:
  Closes #96

Authors
  DO YUNG YOON: steamshon@apache.org


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/8dbb9a3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/8dbb9a3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/8dbb9a3e

Branch: refs/heads/master
Commit: 8dbb9a3eeff6f412fe5defee2baffee32f174981
Parents: b590831
Author: DO YUNG YOON <st...@apache.org>
Authored: Wed Nov 16 16:56:41 2016 +0100
Committer: DO YUNG YOON <st...@apache.org>
Committed: Wed Nov 16 17:01:07 2016 +0100

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../org/apache/s2graph/core/mysqls/schema.sql   |   1 +
 .../scala/org/apache/s2graph/core/Edge.scala    |  82 ++-
 .../apache/s2graph/core/ExceptionHandler.scala  |  19 +-
 .../scala/org/apache/s2graph/core/Graph.scala   | 232 ++++---
 .../apache/s2graph/core/GraphExceptions.scala   |   2 +
 .../org/apache/s2graph/core/Management.scala    |  69 +--
 .../org/apache/s2graph/core/OrderingUtil.scala  |   7 +
 .../org/apache/s2graph/core/PostProcess.scala   | 601 ++++---------------
 .../org/apache/s2graph/core/QueryParam.scala    |  75 ++-
 .../org/apache/s2graph/core/QueryResult.scala   | 234 +++++++-
 .../scala/org/apache/s2graph/core/Vertex.scala  |  74 ++-
 .../apache/s2graph/core/mysqls/Experiment.scala |   3 +-
 .../org/apache/s2graph/core/mysqls/Label.scala  |  59 +-
 .../s2graph/core/mysqls/ServiceColumn.scala     |  31 +-
 .../s2graph/core/parsers/WhereParser.scala      |   2 +-
 .../s2graph/core/rest/RequestParser.scala       | 177 ++++--
 .../apache/s2graph/core/rest/RestHandler.scala  | 182 +++---
 .../apache/s2graph/core/storage/Storage.scala   | 336 ++++++-----
 .../core/storage/hbase/AsynchbaseStorage.scala  | 147 +++--
 .../indexedge/wide/IndexEdgeSerializable.scala  |   2 +-
 .../org/apache/s2graph/core/utils/Logger.scala  |   6 +
 .../org/apache/s2graph/core/EdgeTest.scala      |   5 +-
 .../s2graph/core/Integrate/CrudTest.scala       | 163 +++--
 .../core/Integrate/IntegrateCommon.scala        |  10 +-
 .../s2graph/core/Integrate/QueryTest.scala      | 159 ++---
 .../core/Integrate/StrongLabelDeleteTest.scala  |  14 +-
 .../core/Integrate/VertexTestHelper.scala       |   7 +-
 .../core/Integrate/WeakLabelDeleteTest.scala    |   9 +-
 .../apache/s2graph/core/JsonParserTest.scala    |   3 +-
 .../apache/s2graph/core/QueryParamTest.scala    |  56 +-
 .../s2graph/core/TestCommonWithModels.scala     |   2 +-
 .../core/benchmark/JsonBenchmarkSpec.scala      |  62 +-
 .../core/benchmark/SamplingBenchmarkSpec.scala  | 207 +++----
 .../org/apache/s2graph/rest/netty/Server.scala  |  96 ++-
 .../apache/s2graph/rest/play/Bootstrap.scala    |   4 +-
 .../s2graph/rest/play/config/Config.scala       |   4 +
 .../controllers/ApplicationController.scala     |   6 +-
 .../play/controllers/CounterController.scala    |  21 +-
 .../rest/play/controllers/EdgeController.scala  | 164 +++--
 .../play/controllers/ExperimentController.scala |   5 +-
 .../play/controllers/PublishController.scala    |   8 -
 .../rest/play/controllers/QueryController.scala |  21 +-
 .../play/controllers/VertexController.scala     |  18 +-
 s2rest_play/conf/reference.conf                 |   1 +
 s2rest_play/conf/routes                         |   7 +-
 46 files changed, 1892 insertions(+), 1503 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 0c8edc3..73b3597 100644
--- a/CHANGES
+++ b/CHANGES
@@ -94,6 +94,8 @@ Release 0.1.0 - unreleased
     
     S2GRAPH-127: Refactor ExceptionHander Object into Class (Committed by DOYUNG YOON).
 
+    S2GRAPH-121: Create `Result` class to hold traverse result edges (Committed by DOYUNG YOON).
+
   BUG FIXES
 
     S2GRAPH-18: Query Option "interval" is Broken. 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
----------------------------------------------------------------------
diff --git a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
index d8ee5bc..822df6c 100644
--- a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
+++ b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
@@ -96,6 +96,7 @@ CREATE TABLE `labels` (
   `is_async` tinyint(4) NOT NULL default '0',
   `compressionAlgorithm` varchar(64) NOT NULL DEFAULT 'lz4',
   `options` text,
+  `deleted_at` datetime DEFAULT NULL,
   PRIMARY KEY (`id`),
   UNIQUE KEY `ux_label` (`label`),
   INDEX `idx_labels_src_column_name` (`src_column_name`),

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
index 4d4afe0..97730f3 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
@@ -19,10 +19,11 @@
 
 package org.apache.s2graph.core
 
+import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
+import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.utils.logger
-import org.apache.s2graph.core.JSONParser._
 import play.api.libs.json.{JsNumber, Json}
 import scala.collection.JavaConversions._
 import scala.util.hashing.MurmurHash3
@@ -112,7 +113,7 @@ case class IndexEdge(srcVertex: Vertex,
   lazy val ordersKeyMap = orders.map { case (byte, _) => byte }.toSet
   lazy val metas = for ((k, v) <- props if !ordersKeyMap.contains(k)) yield k -> v.innerVal
 
-  //  lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) }
+//  lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) }
 
   //TODO:
   //  lazy val kvs = Graph.client.indexedEdgeSerializer(this).toKeyValues.toList
@@ -151,8 +152,23 @@ case class Edge(srcVertex: Vertex,
   val schemaVer = label.schemaVersion
   val ts = propsWithTs(LabelMeta.timeStampSeq).innerVal.toString.toLong
 
+  lazy val srcId = srcVertex.innerIdVal
+  lazy val tgtId = tgtVertex.innerIdVal
+  lazy val labelName = label.label
+  lazy val direction = GraphUtil.fromDirection(labelWithDir.dir)
+  lazy val properties = toProps()
+
   def props = propsWithTs.mapValues(_.innerVal)
 
+
+  private def toProps(): Map[String, Any] = {
+    for {
+      (labelMeta, defaultVal) <- label.metaPropsDefaultMapInner
+    } yield {
+      labelMeta.name -> propsWithTs.getOrElse(labelMeta.seq, defaultVal).innerVal.value
+    }
+  }
+
   def relatedEdges = {
     if (labelWithDir.isDirected) {
       val skipReverse = label.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false)
@@ -204,10 +220,10 @@ case class Edge(srcVertex: Vertex,
 
   def isDegree = propsWithTs.contains(LabelMeta.degreeSeq)
 
-  //  def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match {
-  //    case Some(_) => props
-  //    case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer))
-  //  }
+//  def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match {
+//    case Some(_) => props
+//    case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer))
+//  }
 
   def propsPlusTsValid = propsWithTs.filter(kv => kv._1 >= 0)
 
@@ -290,8 +306,31 @@ case class Edge(srcVertex: Vertex,
 
     ret.mkString("\t")
   }
+
+  def selectValues(selectColumns: Seq[String],
+                   useToString: Boolean = true,
+                   score: Double = 0.0): Seq[Option[Any]] = {
+    //TODO: Option should be matched in JsonParser anyTo*
+    for {
+      selectColumn <- selectColumns
+    } yield {
+      val valueOpt = selectColumn match {
+        case LabelMeta.from.name | "from" => Option(srcId)
+        case LabelMeta.to.name | "to" => Option(tgtId)
+        case "label" => Option(labelName)
+        case "direction" => Option(direction)
+        case "score" => Option(score)
+        case LabelMeta.timestamp.name | "timestamp" => Option(ts)
+        case _ =>
+          properties.get(selectColumn)
+      }
+      if (useToString) valueOpt.map(_.toString)
+      else valueOpt
+    }
+  }
 }
 
+
 case class EdgeMutate(edgesToDelete: List[IndexEdge] = List.empty[IndexEdge],
                       edgesToInsert: List[IndexEdge] = List.empty[IndexEdge],
                       newSnapshotEdge: Option[SnapshotEdge] = None) {
@@ -310,6 +349,33 @@ object Edge {
   val incrementVersion = 1L
   val minTsVal = 0L
 
+  def toEdge(srcId: Any,
+                    tgtId: Any,
+                    labelName: String,
+                    direction: String,
+                    props: Map[String, Any] = Map.empty,
+                    ts: Long = System.currentTimeMillis(),
+                    operation: String = "insert"): Edge = {
+    val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
+
+    val srcVertexId = toInnerVal(srcId.toString, label.srcColumn.columnType, label.schemaVersion)
+    val tgtVertexId = toInnerVal(tgtId.toString, label.tgtColumn.columnType, label.schemaVersion)
+
+    val srcColId = label.srcColumn.id.get
+    val tgtColId = label.tgtColumn.id.get
+
+    val srcVertex = Vertex(SourceVertexId(srcColId, srcVertexId), System.currentTimeMillis())
+    val tgtVertex = Vertex(TargetVertexId(tgtColId, tgtVertexId), System.currentTimeMillis())
+    val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
+
+    val labelWithDir = LabelWithDirection(label.id.get, dir)
+    val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts)
+    val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
+    val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
+
+    new Edge(srcVertex, tgtVertex, labelWithDir, op = op, version = ts, propsWithTs = propsWithTs)
+  }
+
   /** now version information is required also **/
   type State = Map[Byte, InnerValLikeWithTs]
   type PropsPairWithTs = (State, State, Long, String)
@@ -387,7 +453,7 @@ object Edge {
 
       //      logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}")
       //      logger.error(s"$propsWithTs")
-      (requestEdge, edgeMutate)
+      (requestEdge.copy(propsWithTs = propsWithTs), edgeMutate)
     }
   }
 
@@ -582,7 +648,7 @@ object Edge {
     (propsWithTs, true)
   }
 
-  def fromString(s: String): Option[Edge] = Graph.toEdge(s)
+//  def fromString(s: String): Option[Edge] = Graph.toEdge(s)
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
index d03d483..48976d3 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
@@ -24,8 +24,11 @@ import java.util.Properties
 import com.typesafe.config.Config
 import org.apache.kafka.clients.producer._
 import org.apache.s2graph.core.utils.logger
+import play.api.libs.json.JsValue
 
 class ExceptionHandler(config: Config) {
+
+
   import ExceptionHandler._
 
   val keyBrokerList = "kafka.metadata.broker.list"
@@ -43,7 +46,6 @@ class ExceptionHandler(config: Config) {
       }
     } else None
 
-
   def enqueue(m: KafkaMessage): Unit = {
     producer match {
       case None => logger.debug(s"skip log to Kafka: ${m}")
@@ -68,24 +70,33 @@ object ExceptionHandler {
   type Key = String
   type Val = String
 
-  def toKafkaMessage(topic: String, element: GraphElement, originalString: Option[String] = None) = {
+  def toKafkaMessage(topic: String,
+                     element: GraphElement,
+                     originalString: Option[String] = None,
+                     produceJson: Boolean = false) = {
+    val edgeString = originalString.getOrElse(element.toLogString())
+    val msg = edgeString
+
     KafkaMessage(
       new ProducerRecord[Key, Val](
         topic,
         element.queuePartitionKey,
-        originalString.getOrElse(element.toLogString())))
+        msg))
   }
 
+  // only used in deleteAll
   def toKafkaMessage(topic: String, tsv: String) = {
     KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv))
   }
 
+  def toKafkaMessage(topic: String, jsValue: JsValue): KafkaMessage = toKafkaMessage(topic, jsValue.toString())
+
   case class KafkaMessage(msg: ProducerRecord[Key, Val])
 
   private def toKafkaProp(config: Config) = {
     val props = new Properties()
 
-    /* all default configuration for new producer */
+    /** all default configuration for new producer */
     val brokers =
       if (config.hasPath("kafka.metadata.broker.list")) config.getString("kafka.metadata.broker.list")
       else "localhost"

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
index c25b71a..2cbddea 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
@@ -23,11 +23,13 @@ import java.util
 import java.util.concurrent.ConcurrentHashMap
 
 import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core.mysqls.{Label, Model}
 import org.apache.s2graph.core.parsers.WhereParser
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
 import org.apache.s2graph.core.types.{InnerVal, LabelWithDirection}
 import org.apache.s2graph.core.utils.logger
+import play.api.libs.json.{JsObject, Json}
 
 import scala.collection.JavaConversions._
 import scala.collection._
@@ -58,6 +60,7 @@ object Graph {
     "back.off.timeout" -> java.lang.Integer.valueOf(1000),
     "hbase.fail.prob" -> java.lang.Double.valueOf(-0.1),
     "delete.all.fetch.size" -> java.lang.Integer.valueOf(1000),
+    "delete.all.fetch.count" -> java.lang.Integer.valueOf(200),
     "future.cache.max.size" -> java.lang.Integer.valueOf(100000),
     "future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
     "future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
@@ -84,10 +87,9 @@ object Graph {
     (hashKey, filterHashKey)
   }
 
-  def alreadyVisitedVertices(queryResultLs: Seq[QueryResult]): Map[(LabelWithDirection, Vertex), Boolean] = {
+  def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): Map[(LabelWithDirection, Vertex), Boolean] = {
     val vertices = for {
-      queryResult <- queryResultLs
-      edgeWithScore <- queryResult.edgeWithScoreLs
+      edgeWithScore <- edgeWithScoreLs
       edge = edgeWithScore.edge
       vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex
     } yield (edge.labelWithDir, vertex) -> true
@@ -132,81 +134,36 @@ object Graph {
     tsVal
   }
 
-  def aggregateScore(newScore: Double,
-                     resultEdges: ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)],
-                     duplicateEdges: ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]],
-                     edgeWithScoreSorted: ListBuffer[(HashKey, FilterHashKey, Edge, Double)],
-                     hashKey: HashKey,
-                     filterHashKey: FilterHashKey,
-                     queryParam: QueryParam,
-                     convertedEdge: Edge) = {
-
-    /* skip duplicate policy check if consistencyLevel is strong */
-    if (queryParam.label.consistencyLevel != "strong" && resultEdges.containsKey(hashKey)) {
-      val (oldFilterHashKey, oldEdge, oldScore) = resultEdges.get(hashKey)
+  def processDuplicates(queryParam: QueryParam,
+                        duplicates: Seq[(FilterHashKey, EdgeWithScore)]): Seq[(FilterHashKey, EdgeWithScore)] = {
+
+    if (queryParam.label.consistencyLevel != "strong") {
       //TODO:
       queryParam.duplicatePolicy match {
-        case Query.DuplicatePolicy.First => // do nothing
-        case Query.DuplicatePolicy.Raw =>
-          if (duplicateEdges.containsKey(hashKey)) {
-            duplicateEdges.get(hashKey).append(convertedEdge -> newScore)
-          } else {
-            val newBuffer = new ListBuffer[(Edge, Double)]
-            newBuffer.append(convertedEdge -> newScore)
-            duplicateEdges.put(hashKey, newBuffer)
-          }
+        case Query.DuplicatePolicy.First => Seq(duplicates.head)
+        case Query.DuplicatePolicy.Raw => duplicates
         case Query.DuplicatePolicy.CountSum =>
-          resultEdges.put(hashKey, (filterHashKey, oldEdge, oldScore + 1))
+          val countSum = duplicates.size
+          Seq(duplicates.head._1 -> duplicates.head._2.copy(score = countSum))
         case _ =>
-          resultEdges.put(hashKey, (filterHashKey, oldEdge, oldScore + newScore))
+          val scoreSum = duplicates.foldLeft(0.0) { case (prev, current) => prev + current._2.score }
+          Seq(duplicates.head._1 -> duplicates.head._2.copy(score = scoreSum))
       }
     } else {
-      resultEdges.put(hashKey, (filterHashKey, convertedEdge, newScore))
-      edgeWithScoreSorted.append((hashKey, filterHashKey, convertedEdge, newScore))
+      duplicates
     }
   }
-
-  def aggregateResults(queryRequestWithResult: QueryRequestWithResult,
-                       queryParamResult: Result,
-                       edgesToInclude: util.HashSet[FilterHashKey],
-                       edgesToExclude: util.HashSet[FilterHashKey]): QueryRequestWithResult = {
-    val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
-    val (query, stepIdx, _, queryParam) = QueryRequest.unapply(queryRequest).get
-
-    val (duplicateEdges, resultEdges, edgeWithScoreSorted) = queryParamResult
-    val edgesWithScores = for {
-      (hashKey, filterHashKey, edge, _) <- edgeWithScoreSorted if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
-      score = resultEdges.get(hashKey)._3
-      (duplicateEdge, aggregatedScore) <- fetchDuplicatedEdges(edge, score, hashKey, duplicateEdges) if aggregatedScore >= queryParam.threshold
-    } yield EdgeWithScore(duplicateEdge, aggregatedScore)
-
-    QueryRequestWithResult(queryRequest, QueryResult(edgesWithScores))
-  }
-
-  def fetchDuplicatedEdges(edge: Edge,
-                           score: Double,
-                           hashKey: HashKey,
-                           duplicateEdges: ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]]) = {
-    (edge -> score) +: (if (duplicateEdges.containsKey(hashKey)) duplicateEdges.get(hashKey) else Seq.empty)
-  }
-
-  def queryResultWithFilter(queryRequestWithResult: QueryRequestWithResult) = {
-    val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
-    val (_, _, _, queryParam) = QueryRequest.unapply(queryRequest).get
-    val whereFilter = queryParam.where.get
-    if (whereFilter == WhereParser.success) queryResult.edgeWithScoreLs
-    else queryResult.edgeWithScoreLs.withFilter(edgeWithScore => whereFilter.filter(edgeWithScore.edge))
-  }
-
-  def filterEdges(queryResultLsFuture: Future[Seq[QueryRequestWithResult]],
+  def filterEdges(q: Query,
+                  stepIdx: Int,
+                  queryRequests: Seq[QueryRequest],
+                  queryResultLsFuture: Future[Seq[StepInnerResult]],
+                  queryParams: Seq[QueryParam],
                   alreadyVisited: Map[(LabelWithDirection, Vertex), Boolean] = Map.empty[(LabelWithDirection, Vertex), Boolean])
-                 (implicit ec: scala.concurrent.ExecutionContext): Future[Seq[QueryRequestWithResult]] = {
+                 (implicit ec: scala.concurrent.ExecutionContext): Future[StepInnerResult] = {
 
     queryResultLsFuture.map { queryRequestWithResultLs =>
-      if (queryRequestWithResultLs.isEmpty) Nil
+      if (queryRequestWithResultLs.isEmpty) StepInnerResult.Empty
       else {
-        val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResultLs.head).get
-        val (q, stepIdx, srcVertex, queryParam) = QueryRequest.unapply(queryRequest).get
         val step = q.steps(stepIdx)
 
         val nextStepOpt = if (stepIdx < q.steps.size - 1) Option(q.steps(stepIdx + 1)) else None
@@ -219,73 +176,114 @@ object Graph {
         val edgesToExclude = new util.HashSet[FilterHashKey]()
         val edgesToInclude = new util.HashSet[FilterHashKey]()
 
-        val queryParamResultLs = new ListBuffer[Result]
-        queryRequestWithResultLs.foreach { queryRequestWithResult =>
-          val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
+        val sequentialLs = new ListBuffer[(HashKey, FilterHashKey, EdgeWithScore)]()
+        val agg = new mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, EdgeWithScore)]]()
+        val params = new mutable.HashMap[HashKey, QueryParam]()
+        var numOfDuplicates = 0
+        var numOfTotal = 0
+        queryRequests.zip(queryRequestWithResultLs).foreach { case (queryRequest, stepInnerResult) =>
           val queryParam = queryRequest.queryParam
-          val duplicateEdges = new util.concurrent.ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]]()
-          val resultEdges = new util.concurrent.ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)]()
-          val edgeWithScoreSorted = new ListBuffer[(HashKey, FilterHashKey, Edge, Double)]
           val labelWeight = step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0)
-
-          // store degree value with Array.empty so if degree edge exist, it comes at very first.
-          def checkDegree() = queryResult.edgeWithScoreLs.headOption.exists { edgeWithScore =>
-            edgeWithScore.edge.isDegree
-          }
-          var isDegree = checkDegree()
-
           val includeExcludeKey = queryParam.labelWithDir.labelId -> queryParam.labelWithDir.dir
           val shouldBeExcluded = excludeLabelWithDirSet.contains(includeExcludeKey)
           val shouldBeIncluded = includeLabelWithDirSet.contains(includeExcludeKey)
-
-          queryResultWithFilter(queryRequestWithResult).foreach { edgeWithScore =>
-            val (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+          val where = queryParam.where.get
+
+          for {
+            edgeWithScore <- stepInnerResult.edgesWithScoreLs
+            if where == WhereParser.success || where.filter(edgeWithScore.edge)
+            (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+          } {
+            numOfTotal += 1
             if (queryParam.transformer.isDefault) {
               val convertedEdge = edge
 
-              val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree)
+              val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree = false)
 
-              /* check if this edge should be exlcuded. */
-              if (shouldBeExcluded && !isDegree) {
+              /** check if this edge should be exlcuded. */
+              if (shouldBeExcluded) {
                 edgesToExclude.add(filterHashKey)
               } else {
-                if (shouldBeIncluded && !isDegree) {
+                if (shouldBeIncluded) {
                   edgesToInclude.add(filterHashKey)
                 }
                 val tsVal = processTimeDecay(queryParam, convertedEdge)
                 val newScore = labelWeight * score * tsVal
-                aggregateScore(newScore, resultEdges, duplicateEdges, edgeWithScoreSorted, hashKey, filterHashKey, queryParam, convertedEdge)
+                val newEdgeWithScore = EdgeWithScore(convertedEdge, newScore)
+                sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore))
+                agg.get(hashKey) match {
+                  case None =>
+                    val newLs = new ListBuffer[(FilterHashKey, EdgeWithScore)]()
+                    newLs += (filterHashKey -> newEdgeWithScore)
+                    agg += (hashKey -> newLs)
+                  case Some(old) =>
+                    numOfDuplicates += 1
+                    old += (filterHashKey -> newEdgeWithScore)
+                }
+                params += (hashKey -> queryParam)
               }
             } else {
               convertEdges(queryParam, edge, nextStepOpt).foreach { convertedEdge =>
-                val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree)
+                val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree = false)
 
-                /* check if this edge should be exlcuded. */
-                if (shouldBeExcluded && !isDegree) {
+                /** check if this edge should be exlcuded. */
+                if (shouldBeExcluded) {
                   edgesToExclude.add(filterHashKey)
                 } else {
-                  if (shouldBeIncluded && !isDegree) {
+                  if (shouldBeIncluded) {
                     edgesToInclude.add(filterHashKey)
                   }
                   val tsVal = processTimeDecay(queryParam, convertedEdge)
                   val newScore = labelWeight * score * tsVal
-                  aggregateScore(newScore, resultEdges, duplicateEdges, edgeWithScoreSorted, hashKey, filterHashKey, queryParam, convertedEdge)
+                  val newEdgeWithScore = EdgeWithScore(convertedEdge, newScore)
+                  sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore))
+                  agg.get(hashKey) match {
+                    case None =>
+                      val newLs = new ListBuffer[(FilterHashKey, EdgeWithScore)]()
+                      newLs += (filterHashKey -> newEdgeWithScore)
+                      agg += (hashKey -> newLs)
+                    case Some(old) =>
+                      numOfDuplicates += 1
+                      old += (filterHashKey -> newEdgeWithScore)
+                  }
+                  params += (hashKey -> queryParam)
                 }
               }
             }
-            isDegree = false
           }
-          val ret = (duplicateEdges, resultEdges, edgeWithScoreSorted)
-          queryParamResultLs.append(ret)
         }
 
-        val aggregatedResults = for {
-          (queryRequestWithResult, queryParamResult) <- queryRequestWithResultLs.zip(queryParamResultLs)
-        } yield {
-            aggregateResults(queryRequestWithResult, queryParamResult, edgesToInclude, edgesToExclude)
+
+        val edgeWithScoreLs = new ListBuffer[EdgeWithScore]()
+        if (numOfDuplicates == 0) {
+          // no duplicates at all.
+          for {
+            (hashKey, filterHashKey, edgeWithScore) <- sequentialLs
+            if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
+          } {
+            edgeWithScoreLs += edgeWithScore
           }
+        } else {
+          // need to resolve duplicates.
+          val seen = new mutable.HashSet[HashKey]()
+          for {
+            (hashKey, filterHashKey, edgeWithScore) <- sequentialLs
+            if !seen.contains(hashKey)
+            if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
+          } {
+            val queryParam = params(hashKey)
+            val duplicates = processDuplicates(queryParam, agg(hashKey))
+            duplicates.foreach { case (_, duplicate) =>
+              if (duplicate.score >= queryParam.threshold) {
+                seen += hashKey
+                edgeWithScoreLs += duplicate
+              }
+            }
+          }
+        }
 
-        aggregatedResults
+        val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges)
+        StepInnerResult(edgesWithScoreLs = edgeWithScoreLs, degreeEdges = degrees)
       }
     }
   }
@@ -310,7 +308,7 @@ object Graph {
     element
   } recover {
     case e: Exception =>
-      logger.error(s"$e", e)
+      logger.error(s"[toElement]: $e", e)
       None
   } get
 
@@ -323,37 +321,33 @@ object Graph {
     toEdge(GraphUtil.split(s))
   }
 
-  //"1418342849000\tu\te\t3286249\t71770\ttalk_friend\t{\"is_hidden\":false}"
-  //{"from":1,"to":101,"label":"graph_test","props":{"time":-1, "weight":10},"timestamp":1417616431},
   def toEdge(parts: Array[String]): Option[Edge] = Try {
     val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
-    val props = if (parts.length >= 7) parts(6) else "{}"
+    val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
     val tempDirection = if (parts.length >= 8) parts(7) else "out"
     val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection
-
-    val edge = Management.toEdge(ts.toLong, operation, srcId, tgtId, label, direction, props)
-    //            logger.debug(s"toEdge: $edge")
-    Some(edge)
+    val edge = Edge.toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation)
+    Option(edge)
   } recover {
     case e: Exception =>
-      logger.error(s"toEdge: $e", e)
+      logger.error(s"[toEdge]: $e", e)
       throw e
   } get
 
-  //"1418342850000\ti\tv\t168756793\ttalk_user_id\t{\"country_iso\":\"KR\"}"
   def toVertex(parts: Array[String]): Option[Vertex] = Try {
     val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
-    val props = if (parts.length >= 7) parts(6) else "{}"
-    Some(Management.toVertex(ts.toLong, operation, srcId, serviceName, colName, props))
+    val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
+    val vertex = Vertex.toVertex(serviceName, colName, srcId, props, ts.toLong, operation)
+    Option(vertex)
   } recover {
     case e: Throwable =>
-      logger.error(s"toVertex: $e", e)
+      logger.error(s"[toVertex]: $e", e)
       throw e
   } get
 
-  def initStorage(config: Config)(ec: ExecutionContext) = {
+  def initStorage(graph: Graph, config: Config)(ec: ExecutionContext) = {
     config.getString("s2graph.storage.backend") match {
-      case "hbase" => new AsynchbaseStorage(config)(ec)
+      case "hbase" => new AsynchbaseStorage(graph, config)(ec)
       case _ => throw new RuntimeException("not supported storage.")
     }
   }
@@ -366,7 +360,7 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) {
   Model.loadCache()
 
   // TODO: Make storage client by config param
-  val storage = Graph.initStorage(config)(ec)
+  val storage = Graph.initStorage(this, config)(ec)
 
 
   for {
@@ -375,9 +369,11 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) {
   } logger.info(s"[Initialized]: $k, ${this.config.getAnyRef(k)}")
 
   /** select */
-  def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[Seq[QueryRequestWithResult]] = storage.checkEdges(params)
+  def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[StepResult] = storage.checkEdges(params)
+
+  def getEdges(q: Query): Future[StepResult] = storage.getEdges(q)
 
-  def getEdges(q: Query): Future[Seq[QueryRequestWithResult]] = storage.getEdges(q)
+  def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = storage.getEdgesMultiQuery(mq)
 
   def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = storage.getVertices(vertices)
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
index 0898ffa..2f090cf 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
@@ -31,6 +31,8 @@ object GraphExceptions {
 
   case class LabelAlreadyExistException(msg: String) extends Exception(msg)
 
+  case class LabelNameTooLongException(msg: String) extends Exception(msg)
+
   case class InternalException(msg: String) extends Exception(msg)
 
   case class IllegalDataTypeException(msg: String) extends Exception(msg)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index 39020c7..9ef7c14 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -19,7 +19,7 @@
 
 package org.apache.s2graph.core
 
-import org.apache.s2graph.core.GraphExceptions.{InvalidHTableException, LabelAlreadyExistException, LabelNotExistException}
+import org.apache.s2graph.core.GraphExceptions.{LabelNameTooLongException, InvalidHTableException, LabelAlreadyExistException, LabelNotExistException}
 import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
 import org.apache.s2graph.core.mysqls._
 import org.apache.s2graph.core.types.HBaseType._
@@ -48,9 +48,9 @@ object Management {
 
   import HBaseType._
 
+  val LABEL_NAME_MAX_LENGTH = 100
   val DefaultCompressionAlgorithm = "gz"
 
-
   def findService(serviceName: String) = {
     Service.findByName(serviceName, useCache = false)
   }
@@ -190,62 +190,6 @@ object Management {
     }
   }
 
-  def toEdge(ts: Long, operation: String, srcId: String, tgtId: String,
-             labelStr: String, direction: String = "", props: String): Edge = {
-
-    val label = tryOption(labelStr, getServiceLabel)
-    val dir =
-      if (direction == "")
-//        GraphUtil.toDirection(label.direction)
-        GraphUtil.directions("out")
-      else
-        GraphUtil.toDirection(direction)
-
-    //    logger.debug(s"$srcId, ${label.srcColumnWithDir(dir)}")
-    //    logger.debug(s"$tgtId, ${label.tgtColumnWithDir(dir)}")
-
-    val srcVertexId = toInnerVal(srcId, label.srcColumn.columnType, label.schemaVersion)
-    val tgtVertexId = toInnerVal(tgtId, label.tgtColumn.columnType, label.schemaVersion)
-
-    val srcColId = label.srcColumn.id.get
-    val tgtColId = label.tgtColumn.id.get
-    val (srcVertex, tgtVertex) = if (dir == GraphUtil.directions("out")) {
-      (Vertex(SourceVertexId(srcColId, srcVertexId), System.currentTimeMillis()),
-        Vertex(TargetVertexId(tgtColId, tgtVertexId), System.currentTimeMillis()))
-    } else {
-      (Vertex(SourceVertexId(tgtColId, tgtVertexId), System.currentTimeMillis()),
-        Vertex(TargetVertexId(srcColId, srcVertexId), System.currentTimeMillis()))
-    }
-
-    //    val dir = if (direction == "") GraphUtil.toDirection(label.direction) else GraphUtil.toDirection(direction)
-    val labelWithDir = LabelWithDirection(label.id.get, dir)
-    val op = tryOption(operation, GraphUtil.toOp)
-
-    val jsObject = Json.parse(props).asOpt[JsObject].getOrElse(Json.obj())
-    val parsedProps = toProps(label, jsObject.fields).toMap
-    val propsWithTs = parsedProps.map(kv => (kv._1 -> InnerValLikeWithTs(kv._2, ts))) ++
-      Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(ts, label.schemaVersion), ts))
-
-    Edge(srcVertex, tgtVertex, labelWithDir, op, version = ts, propsWithTs = propsWithTs)
-
-  }
-
-  def toVertex(ts: Long, operation: String, id: String, serviceName: String, columnName: String, props: String): Vertex = {
-    Service.findByName(serviceName) match {
-      case None => throw new RuntimeException(s"$serviceName does not exist. create service first.")
-      case Some(service) =>
-        ServiceColumn.find(service.id.get, columnName) match {
-          case None => throw new RuntimeException(s"$columnName is not exist. create service column first.")
-          case Some(col) =>
-            val idVal = toInnerVal(id, col.columnType, col.schemaVersion)
-            val op = tryOption(operation, GraphUtil.toOp)
-            val jsObject = Json.parse(props).asOpt[JsObject].getOrElse(Json.obj())
-            val parsedProps = toProps(col, jsObject).toMap
-            Vertex(VertexId(col.id.get, idVal), ts, parsedProps, op = op)
-        }
-    }
-  }
-
   def toProps(column: ServiceColumn, js: JsObject): Seq[(Int, InnerValLike)] = {
 
     val props = for {
@@ -346,7 +290,7 @@ class Management(graph: Graph) {
                   schemaVersion: String = DEFAULT_VERSION,
                   isAsync: Boolean,
                   compressionAlgorithm: String = "gz",
-                  options: Option[String] = None): Try[Label] = {
+                  options: Option[String]): Try[Label] = {
 
     val labelOpt = Label.findByName(label, useCache = false)
 
@@ -355,7 +299,8 @@ class Management(graph: Graph) {
         case Some(l) =>
           throw new GraphExceptions.LabelAlreadyExistException(s"Label name ${l.label} already exist.")
         case None =>
-          /* create all models */
+          /** create all models */
+          if (label.length > LABEL_NAME_MAX_LENGTH ) throw new LabelNameTooLongException(s"Label name ${label} too long.( max length : 40 )")
           val newLabel = Label.insertAll(label,
             srcServiceName, srcColumnName, srcColumnType,
             tgtServiceName, tgtColumnName, tgtColumnType,
@@ -387,8 +332,8 @@ class Management(graph: Graph) {
    * copy label: only used by bulk load job. not sure if we need to parameterize hbase cluster.
    */
   def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]) = {
-    val old = Label.findByName(oldLabelName).getOrElse(throw new LabelAlreadyExistException(s"Old label $oldLabelName not exists."))
-    if (Label.findByName(newLabelName).isDefined) throw new LabelAlreadyExistException(s"New label $newLabelName already exists.")
+    val old = Label.findByName(oldLabelName, useCache = false).getOrElse(throw new LabelNotExistException(s"Old label $oldLabelName not exists."))
+    if (Label.findByName(newLabelName, useCache = false).isDefined) throw new LabelAlreadyExistException(s"New label $newLabelName already exists.")
 
     val allProps = old.metas.map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) }
     val allIndices = old.indices.map { index => Index(index.name, index.propNames) }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala b/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala
index 0ecbf4e..eaadb39 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala
@@ -49,8 +49,15 @@ object OrderingUtil {
         case (xv: Long, yv: Long) => implicitly[Ordering[Long]].compare(xv, yv)
         case (xv: Double, yv: Double) => implicitly[Ordering[Double]].compare(xv, yv)
         case (xv: String, yv: String) => implicitly[Ordering[String]].compare(xv, yv)
+        case (xv: BigDecimal, yv: BigDecimal) => implicitly[Ordering[BigDecimal]].compare(xv, yv)
         case (xv: JsValue, yv: JsValue) => implicitly[Ordering[JsValue]].compare(xv, yv)
         case (xv: InnerValLike, yv: InnerValLike) => implicitly[Ordering[InnerValLike]].compare(xv, yv)
+        case (xv: BigDecimal, yv: Long) => implicitly[Ordering[BigDecimal]].compare(xv, BigDecimal(yv))
+        case (xv: Long, yv: BigDecimal) => implicitly[Ordering[BigDecimal]].compare(BigDecimal(xv), yv)
+        case (xv: BigDecimal, yv: Int) => implicitly[Ordering[BigDecimal]].compare(xv, BigDecimal(yv))
+        case (xv: Int, yv: BigDecimal) => implicitly[Ordering[BigDecimal]].compare(BigDecimal(xv), yv)
+        case (xv: BigDecimal, yv: Double) => implicitly[Ordering[BigDecimal]].compare(xv, BigDecimal(yv))
+        case (xv: Double, yv: BigDecimal) => implicitly[Ordering[BigDecimal]].compare(BigDecimal(xv), yv)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
index 9ab08b3..7cc2420 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
@@ -20,11 +20,13 @@
 package org.apache.s2graph.core
 
 import org.apache.s2graph.core.GraphExceptions.BadQueryException
-import org.apache.s2graph.core.mysqls.{ColumnMeta, Label, LabelMeta, ServiceColumn}
+import org.apache.s2graph.core.mysqls._
 import org.apache.s2graph.core.types.{InnerVal, InnerValLike}
-import play.api.libs.json.{Json, _}
 import org.apache.s2graph.core.JSONParser._
-import scala.collection.mutable.ListBuffer
+import play.api.libs.json.{Json, _}
+
+import scala.collection.immutable
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 
 object PostProcess {
 
@@ -32,6 +34,7 @@ object PostProcess {
   type EDGE_VALUES = Map[String, JsValue]
   type ORDER_BY_VALUES =  (Any, Any, Any, Any)
   type RAW_EDGE = (EDGE_VALUES, Double, ORDER_BY_VALUES)
+  type GROUP_BY_KEY = Map[String, JsValue]
 
   /**
    * Result Entity score field name
@@ -47,523 +50,153 @@ object PostProcess {
   val SCORE_FIELD_NAME = "scoreSum"
   val reservedColumns = Set("cacheRemain", "from", "to", "label", "direction", "_timestamp", "timestamp", "score", "props")
 
-  def groupEdgeResult(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = {
-    val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap
-    //    filterNot {case (edge, score) => edge.props.contains(LabelMeta.degreeSeq)}
-    val groupedEdgesWithRank = (for {
-      queryRequestWithResult <- queryRequestWithResultLs
-      (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
-      edgeWithScore <- queryResult.edgeWithScoreLs
-      (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
-      if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, queryRequest.query.filterOutFields))
-    } yield {
-      (queryRequest.queryParam, edge, score)
-    }).groupBy {
-      case (queryParam, edge, rank) if edge.labelWithDir.dir == GraphUtil.directions("in") =>
-        (queryParam.label.srcColumn, queryParam.label.label, queryParam.label.tgtColumn, edge.tgtVertex.innerId, edge.isDegree)
-      case (queryParam, edge, rank) =>
-        (queryParam.label.tgtColumn, queryParam.label.label, queryParam.label.srcColumn, edge.tgtVertex.innerId, edge.isDegree)
-    }
 
-    val ret = for {
-      ((tgtColumn, labelName, srcColumn, target, isDegreeEdge), edgesAndRanks) <- groupedEdgesWithRank if !isDegreeEdge
-      edgesWithRanks = edgesAndRanks.groupBy(x => x._2.srcVertex).map(_._2.head)
-      id <- innerValToJsValue(target, tgtColumn.columnType)
-    } yield {
-      Json.obj("name" -> tgtColumn.columnName, "id" -> id,
-        SCORE_FIELD_NAME -> edgesWithRanks.map(_._3).sum,
-        "label" -> labelName,
-        "aggr" -> Json.obj(
-          "name" -> srcColumn.columnName,
-          "ids" -> edgesWithRanks.flatMap { case (queryParam, edge, rank) =>
-            innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType)
-          },
-          "edges" -> edgesWithRanks.map { case (queryParam, edge, rank) =>
-            Json.obj("id" -> innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType),
-              "props" -> propsToJson(edge),
-              "score" -> rank
-            )
-          }
-        )
-      )
-    }
-
-    ret.toList
-  }
-
-  def sortWithFormatted(jsons: Seq[JsObject],
-                        scoreField: String = "scoreSum",
-                        queryRequestWithResultLs: Seq[QueryRequestWithResult],
-                        decrease: Boolean = true): JsObject = {
-    val ordering = if (decrease) -1 else 1
-    val sortedJsons = jsons.sortBy { jsObject => (jsObject \ scoreField).as[Double] * ordering }
-
-    if (queryRequestWithResultLs.isEmpty) Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons)
-    else Json.obj(
-      "size" -> sortedJsons.size,
-      "results" -> sortedJsons,
-      "impressionId" -> queryRequestWithResultLs.head.queryRequest.query.impressionId()
-    )
-  }
-
-  private def toHashKey(edge: Edge, queryParam: QueryParam, fields: Seq[String], delimiter: String = ","): Int = {
-    val ls = for {
-      field <- fields
-    } yield {
-      field match {
-        case "from" | "_from" => edge.srcVertex.innerId.toIdString()
-        case "to" | "_to" => edge.tgtVertex.innerId.toIdString()
-        case "label" => edge.labelWithDir.labelId
-        case "direction" => JsString(GraphUtil.fromDirection(edge.labelWithDir.dir))
-        case "_timestamp" | "timestamp" => edge.ts
-        case _ =>
-          queryParam.label.metaPropsInvMap.get(field) match {
-            case None => throw new RuntimeException(s"unknow column: $field")
-            case Some(labelMeta) => edge.propsWithTs.get(labelMeta.seq) match {
-              case None => labelMeta.defaultValue
-              case Some(propVal) => propVal
-            }
-          }
-      }
+  def s2EdgeParent(graph: Graph,
+                   parentEdges: Seq[EdgeWithScore]): JsValue = {
+    if (parentEdges.isEmpty) JsNull
+    else {
+      val ancestors = for {
+        current <- parentEdges
+        parents = s2EdgeParent(graph, current.edge.parentEdges) if parents != JsNull
+      } yield {
+          val s2Edge = current.edge.originalEdgeOpt.getOrElse(current.edge)
+          s2EdgeToJsValue(s2Edge, current.score, false, parents = parents)
+        }
+      Json.toJson(ancestors)
     }
-    val ret = ls.hashCode()
-    ret
-  }
-
-  def resultInnerIds(queryRequestWithResultLs: Seq[QueryRequestWithResult], isSrcVertex: Boolean = false): Seq[Int] = {
-    for {
-      queryRequestWithResult <- queryRequestWithResultLs
-      (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
-      q = queryRequest.query
-      edgeWithScore <- queryResult.edgeWithScoreLs
-      (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
-    } yield  toHashKey(edge, queryRequest.queryParam, q.filterOutFields)
-  }
-
-  def summarizeWithListExcludeFormatted(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = {
-    val jsons = groupEdgeResult(queryRequestWithResultLs, exclude)
-    sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs, decrease = true)
   }
 
-  def summarizeWithList(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = {
-    val jsons = groupEdgeResult(queryRequestWithResultLs, exclude)
-    sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs)
-  }
-
-  def summarizeWithListFormatted(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = {
-    val jsons = groupEdgeResult(queryRequestWithResultLs, exclude)
-    sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs)
-  }
-
-  def toSimpleVertexArrJson(queryRequestWithResultLs: Seq[QueryRequestWithResult]): JsValue = {
-    toSimpleVertexArrJson(queryRequestWithResultLs, Seq.empty[QueryRequestWithResult])
-  }
-
-  private def orderBy(queryOption: QueryOption,
-                      rawEdges: ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)]): ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)] = {
-    import OrderingUtil._
-
-    if (queryOption.withScore && queryOption.orderByColumns.nonEmpty) {
-      val ascendingLs = queryOption.orderByColumns.map(_._2)
-      rawEdges.sortBy(_._3)(TupleMultiOrdering[Any](ascendingLs))
+  def s2EdgeToJsValue(s2Edge: Edge,
+                      score: Double,
+                      isDegree: Boolean = false,
+                      parents: JsValue = JsNull): JsValue = {
+    if (isDegree) {
+      Json.obj(
+        "from" -> anyValToJsValue(s2Edge.srcId),
+        "label" -> s2Edge.labelName,
+        LabelMeta.degree.name -> anyValToJsValue(s2Edge.propsWithTs(LabelMeta.degreeSeq).innerVal.value)
+      )
     } else {
-      rawEdges
+      Json.obj("from" -> anyValToJsValue(s2Edge.srcId),
+        "to" -> anyValToJsValue(s2Edge.tgtId),
+        "label" -> s2Edge.labelName,
+        "score" -> score,
+        "props" -> JSONParser.propertiesToJson(s2Edge.properties),
+        "direction" -> s2Edge.direction,
+        "timestamp" -> anyValToJsValue(s2Edge.ts),
+        "parents" -> parents
+      )
     }
   }
 
-  private def getColumnValue(keyWithJs: Map[String, JsValue], score: Double, edge: Edge, column: String): Any = {
-    column match {
-      case "score" => score
-      case "timestamp" | "_timestamp" => edge.ts
-      case _ =>
-        keyWithJs.get(column) match {
-          case None => keyWithJs.get("props").map { js => (js \ column).as[JsValue] }.get
-          case Some(x) => x
-        }
+  def withImpressionId(queryOption: QueryOption,
+                        size: Int,
+                        degrees: Seq[JsValue],
+                        results: Seq[JsValue]): JsValue = {
+    queryOption.impIdOpt match {
+      case None => Json.obj(
+        "size" -> size,
+        "degrees" -> degrees,
+        "results" -> results
+      )
+      case Some(impId) =>
+        Json.obj(
+          "size" -> size,
+          "degrees" -> degrees,
+          "results" -> results,
+          Experiment.ImpressionKey -> impId
+        )
     }
   }
+  def s2VertexToJson(s2Vertex: Vertex): Option[JsValue] = {
+    val props = for {
+      (k, v) <- s2Vertex.properties
+      jsVal <- anyValToJsValue(v)
+    } yield k -> jsVal
 
-  private def buildReplaceJson(jsValue: JsValue)(mapper: JsValue => JsValue): JsValue = {
-    def traverse(js: JsValue): JsValue = js match {
-      case JsNull => mapper(JsNull)
-      case JsNumber(v) => mapper(js)
-      case JsString(v) => mapper(js)
-      case JsBoolean(v) => mapper(js)
-      case JsArray(elements) => JsArray(elements.map { t => traverse(mapper(t)) })
-      case JsObject(values) => JsObject(values.map { case (k, v) => k -> traverse(mapper(v)) })
+    for {
+      id <- anyValToJsValue(s2Vertex.innerIdVal)
+    } yield {
+      Json.obj(
+        "serviceName" -> s2Vertex.serviceName,
+        "columnName" -> s2Vertex.columnName,
+        "id" -> id,
+        "props" -> Json.toJson(props),
+        "timestamp" -> s2Vertex.ts
+      )
     }
-
-    traverse(jsValue)
   }
 
-  /** test query with filterOut is not working since it can not diffrentate filterOut */
-  private def buildNextQuery(jsonQuery: JsValue, _cursors: Seq[Seq[String]]): JsValue = {
-    val cursors = _cursors.flatten.iterator
+  def verticesToJson(s2Vertices: Seq[Vertex]): JsValue =
+    Json.toJson(s2Vertices.flatMap(s2VertexToJson(_)))
 
-    buildReplaceJson(jsonQuery) {
-      case js@JsObject(fields) =>
-        val isStep = fields.find { case (k, _) => k == "label" } // find label group
-        if (isStep.isEmpty) js
-        else {
-          // TODO: Order not ensured
-          val withCursor = js.fieldSet | Set("cursor" -> JsString(cursors.next))
-          JsObject(withCursor.toSeq)
-        }
-      case js => js
-    }
-  }
+  def withOptionalFields(queryOption: QueryOption,
+                         size: Int,
+                         degrees: Seq[JsValue],
+                         results: Seq[JsValue],
+                         failCount: Int = 0,
+                         cursors: => JsValue,
+                         nextQuery: => Option[JsValue]): JsValue = {
 
-  private def buildRawEdges(queryOption: QueryOption,
-                            queryRequestWithResultLs: Seq[QueryRequestWithResult],
-                            excludeIds: Map[Int, Boolean],
-                            scoreWeight: Double = 1.0): (ListBuffer[JsValue], ListBuffer[RAW_EDGE]) = {
-    val degrees = ListBuffer[JsValue]()
-    val rawEdges = ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)]()
-    val metaPropNamesMap = scala.collection.mutable.Map[String, Int]()
-    for {
-      queryRequestWithResult <- queryRequestWithResultLs
-    } yield {
-      val (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get
-      queryRequest.queryParam.label.metaPropNames.foreach { metaPropName =>
-        metaPropNamesMap.put(metaPropName, metaPropNamesMap.getOrElse(metaPropName, 0) + 1)
-      }
-    }
-    val propsExistInAll = metaPropNamesMap.filter(kv => kv._2 == queryRequestWithResultLs.length)
-    val orderByColumns = queryOption.orderByColumns.filter { case (column, _) =>
-      column match {
-        case "from" | "to" | "label" | "score" | "timestamp" | "_timestamp" => true
-        case _ =>
-          propsExistInAll.contains(column)
-//          //TODO??
-//          false
-//          queryParam.label.metaPropNames.contains(column)
-      }
-    }
+    val kvs = new ArrayBuffer[(String, JsValue)]()
 
-    /* build result jsons */
-    for {
-      queryRequestWithResult <- queryRequestWithResultLs
-      (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
-      queryParam = queryRequest.queryParam
-      edgeWithScore <- queryResult.edgeWithScoreLs
-      (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
-      hashKey = toHashKey(edge, queryRequest.queryParam, queryOption.filterOutFields)
-      if !excludeIds.contains(hashKey)
-    } {
+    kvs.append("size" -> JsNumber(size))
+    kvs.append("degrees" -> JsArray(degrees))
+    kvs.append("results" -> JsArray(results))
 
-      // edge to json
-      val (srcColumn, _) = queryParam.label.srcTgtColumn(edge.labelWithDir.dir)
-      val fromOpt = innerValToJsValue(edge.srcVertex.id.innerId, srcColumn.columnType)
-      if (edge.isDegree && fromOpt.isDefined) {
-        if (queryOption.limitOpt.isEmpty) {
-          degrees += Json.obj(
-            "from" -> fromOpt.get,
-            "label" -> queryRequest.queryParam.label.label,
-            "direction" -> GraphUtil.fromDirection(edge.labelWithDir.dir),
-            LabelMeta.degree.name -> innerValToJsValue(edge.propsWithTs(LabelMeta.degreeSeq).innerVal, InnerVal.LONG)
-          )
-        }
-      } else {
-        val keyWithJs = edgeToJson(edge, score, queryRequest.query, queryRequest.queryParam)
-        val orderByValues: (Any, Any, Any, Any) = orderByColumns.length match {
-          case 0 =>
-            (None, None, None, None)
-          case 1 =>
-            val it = orderByColumns.iterator
-            val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            (v1, None, None, None)
-          case 2 =>
-            val it = orderByColumns.iterator
-            val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            (v1, v2, None, None)
-          case 3 =>
-            val it = orderByColumns.iterator
-            val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            val v3 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            (v1, v2, v3, None)
-          case _ =>
-            val it = orderByColumns.iterator
-            val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            val v3 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            val v4 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            (v1, v2, v3, v4)
-        }
+    if (queryOption.impIdOpt.isDefined) kvs.append(Experiment.ImpressionKey -> JsString(queryOption.impIdOpt.get))
 
-        val currentEdge = (keyWithJs, score * scoreWeight, orderByValues)
-        rawEdges += currentEdge
-      }
-    }
-    (degrees, rawEdges)
+    JsObject(kvs)
   }
 
-  private def buildResultJsValue(queryOption: QueryOption,
-                                 degrees: ListBuffer[JsValue],
-                                 rawEdges: ListBuffer[RAW_EDGE]): JsValue = {
-    if (queryOption.groupByColumns.isEmpty) {
-      // ordering
-      val filteredEdges = rawEdges.filter(t => t._2 >= queryOption.scoreThreshold)
+  def toJson(graph: Graph,
+             queryOption: QueryOption,
+             stepResult: StepResult): JsValue = {
 
-      val edges = queryOption.limitOpt match {
-        case None => orderBy(queryOption, filteredEdges).map(_._1)
-        case Some(limit) => orderBy(queryOption, filteredEdges).map(_._1).take(limit)
-      }
-      val resultDegrees = if (queryOption.returnDegree) degrees else emptyDegrees
 
-      Json.obj(
-        "size" -> edges.size,
-        "degrees" -> resultDegrees,
-//        "queryNext" -> buildNextQuery(q.jsonQuery, q.cursorStrings()),
-        "results" -> edges
-      )
-    } else {
-      val grouped = rawEdges.groupBy { case (keyWithJs, _, _) =>
-        val props = keyWithJs.get("props")
+    val degrees =
+      if (queryOption.returnDegree) stepResult.degreeEdges.map(t => s2EdgeToJsValue(t.s2Edge, t.score, true))
+      else emptyDegrees
 
-        for {
-          column <- queryOption.groupByColumns
-          value <- keyWithJs.get(column) match {
-            case None => props.flatMap { js => (js \ column).asOpt[JsValue] }
-            case Some(x) => Some(x)
-          }
-        } yield column -> value
+    if (queryOption.groupBy.keys.isEmpty) {
+      // no group by specified on query.
+
+      val ls = stepResult.results.map { t =>
+        val parents = if (queryOption.returnTree) s2EdgeParent(graph, t.parentEdges) else JsNull
+        s2EdgeToJsValue(t.s2Edge, t.score, false, parents)
       }
+      withImpressionId(queryOption, ls.size, degrees, ls)
+    } else {
 
-      val groupedEdgesWithScoreSum =
+      val results =
         for {
-          (groupByKeyVals, groupedRawEdges) <- grouped
-          scoreSum = groupedRawEdges.map(x => x._2).sum if scoreSum >= queryOption.scoreThreshold
+          (groupByValues, (scoreSum, edges)) <- stepResult.grouped
         } yield {
-          // ordering
-          val edges = orderBy(queryOption, groupedRawEdges).map(_._1)
+          val groupByKeyValues = queryOption.groupBy.keys.zip(groupByValues).map { case (k, valueOpt) =>
+            k -> valueOpt.flatMap(anyValToJsValue).getOrElse(JsNull)
+          }
+          val groupByValuesJson = Json.toJson(groupByKeyValues.toMap)
 
-          //TODO: refactor this
-          val js = if (queryOption.returnAgg)
+          if (!queryOption.returnAgg) {
             Json.obj(
-              "groupBy" -> Json.toJson(groupByKeyVals.toMap),
+              "groupBy" -> groupByValuesJson,
               "scoreSum" -> scoreSum,
-              "agg" -> edges
+              "agg" -> Json.arr()
             )
-          else
+          } else {
+            val agg = edges.map { t =>
+              val parents = if (queryOption.returnTree) s2EdgeParent(graph, t.parentEdges) else JsNull
+              s2EdgeToJsValue(t.s2Edge, t.score, false, parents)
+            }
+            val aggJson = Json.toJson(agg)
             Json.obj(
-              "groupBy" -> Json.toJson(groupByKeyVals.toMap),
+              "groupBy" -> groupByValuesJson,
               "scoreSum" -> scoreSum,
-              "agg" -> Json.arr()
+              "agg" -> aggJson
             )
-          (js, scoreSum)
-        }
-
-      val groupedSortedJsons = queryOption.limitOpt match {
-        case None =>
-          groupedEdgesWithScoreSum.toList.sortBy { case (jsVal, scoreSum) => scoreSum * -1 }.map(_._1)
-        case Some(limit) =>
-          groupedEdgesWithScoreSum.toList.sortBy { case (jsVal, scoreSum) => scoreSum * -1 }.map(_._1).take(limit)
-      }
-      val resultDegrees = if (queryOption.returnDegree) degrees else emptyDegrees
-      Json.obj(
-        "size" -> groupedSortedJsons.size,
-        "degrees" -> resultDegrees,
-//        "queryNext" -> buildNextQuery(q.jsonQuery, q.cursorStrings()),
-        "results" -> groupedSortedJsons
-      )
-    }
-  }
-
-  def toSimpleVertexArrJsonMulti(queryOption: QueryOption,
-                                 resultWithExcludeLs: Seq[(Seq[QueryRequestWithResult], Seq[QueryRequestWithResult])],
-                                 excludes: Seq[QueryRequestWithResult]): JsValue = {
-    val excludeIds = (Seq((Seq.empty, excludes)) ++ resultWithExcludeLs).foldLeft(Map.empty[Int, Boolean]) { case (acc, (result, excludes)) =>
-      acc ++ resultInnerIds(excludes).map(hashKey => hashKey -> true).toMap
-    }
-
-    val (degrees, rawEdges) = (ListBuffer.empty[JsValue], ListBuffer.empty[RAW_EDGE])
-    for {
-      (result, localExclude) <- resultWithExcludeLs
-    } {
-      val newResult = result.map { queryRequestWithResult =>
-        val (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get
-        val newQuery = queryRequest.query.copy(queryOption = queryOption)
-        queryRequestWithResult.copy(queryRequest = queryRequest.copy(query = newQuery))
-      }
-      val (_degrees, _rawEdges) = buildRawEdges(queryOption, newResult, excludeIds)
-      degrees ++= _degrees
-      rawEdges ++= _rawEdges
-    }
-    buildResultJsValue(queryOption, degrees, rawEdges)
-  }
-
-  def toSimpleVertexArrJson(queryOption: QueryOption,
-                            queryRequestWithResultLs: Seq[QueryRequestWithResult],
-                            exclude: Seq[QueryRequestWithResult]): JsValue = {
-    val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap
-    val (degrees, rawEdges) = buildRawEdges(queryOption, queryRequestWithResultLs, excludeIds)
-    buildResultJsValue(queryOption, degrees, rawEdges)
-  }
-
-
-  def toSimpleVertexArrJson(queryRequestWithResultLs: Seq[QueryRequestWithResult],
-                            exclude: Seq[QueryRequestWithResult]): JsValue = {
-
-    queryRequestWithResultLs.headOption.map { queryRequestWithResult =>
-      val (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get
-      val query = queryRequest.query
-      val queryOption = query.queryOption
-      val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap
-      val (degrees, rawEdges) = buildRawEdges(queryOption, queryRequestWithResultLs, excludeIds)
-      buildResultJsValue(queryOption, degrees, rawEdges)
-    } getOrElse emptyResults
-  }
-
-  def verticesToJson(vertices: Iterable[Vertex]) = {
-    Json.toJson(vertices.flatMap { v => vertexToJson(v) })
-  }
-
-  def propsToJson(edge: Edge, q: Query, queryParam: QueryParam): Map[String, JsValue] = {
-    for {
-      (seq, innerValWithTs) <- edge.propsWithTs if LabelMeta.isValidSeq(seq)
-      labelMeta <- queryParam.label.metaPropsMap.get(seq)
-      jsValue <- innerValToJsValue(innerValWithTs.innerVal, labelMeta.dataType)
-    } yield labelMeta.name -> jsValue
-  }
-
-  private def edgeParent(parentEdges: Seq[EdgeWithScore], q: Query, queryParam: QueryParam): JsValue = {
-    if (parentEdges.isEmpty) {
-      JsNull
-    } else {
-      val parents = for {
-        parent <- parentEdges
-        (parentEdge, parentScore) = EdgeWithScore.unapply(parent).get
-        parentQueryParam = QueryParam(parentEdge.labelWithDir)
-        parents = edgeParent(parentEdge.parentEdges, q, parentQueryParam) if parents != JsNull
-      } yield {
-        val originalEdge = parentEdge.originalEdgeOpt.getOrElse(parentEdge)
-        val edgeJson = edgeToJsonInner(originalEdge, parentScore, q, parentQueryParam) + ("parents" -> parents)
-        Json.toJson(edgeJson)
-      }
-
-      Json.toJson(parents)
-    }
-  }
-
-  /** TODO */
-  def edgeToJsonInner(edge: Edge, score: Double, q: Query, queryParam: QueryParam): Map[String, JsValue] = {
-    val (srcColumn, tgtColumn) = queryParam.label.srcTgtColumn(edge.labelWithDir.dir)
-
-    val kvMapOpt = for {
-      from <- innerValToJsValue(edge.srcVertex.id.innerId, srcColumn.columnType)
-      to <- innerValToJsValue(edge.tgtVertex.id.innerId, tgtColumn.columnType)
-    } yield {
-      val targetColumns = if (q.selectColumnsSet.isEmpty) reservedColumns else (reservedColumns & q.selectColumnsSet) + "props"
-      val _propsMap = queryParam.label.metaPropsDefaultMapInner ++ propsToJson(edge, q, queryParam)
-      val propsMap = if (q.selectColumnsSet.nonEmpty) _propsMap.filterKeys(q.selectColumnsSet) else _propsMap
-
-      val kvMap = targetColumns.foldLeft(Map.empty[String, JsValue]) { (map, column) =>
-        val jsValue = column match {
-          case "cacheRemain" => JsNumber(queryParam.cacheTTLInMillis - (System.currentTimeMillis() - queryParam.timestamp))
-          case "from" => from
-          case "to" => to
-          case "label" => JsString(queryParam.label.label)
-          case "direction" => JsString(GraphUtil.fromDirection(edge.labelWithDir.dir))
-          case "_timestamp" | "timestamp" => JsNumber(edge.ts)
-          case "score" => JsNumber(score)
-          case "props" if propsMap.nonEmpty => Json.toJson(propsMap)
-          case _ => JsNull
+          }
         }
-
-        if (jsValue == JsNull) map else map + (column -> jsValue)
-      }
-      kvMap
-    }
-
-    kvMapOpt.getOrElse(Map.empty)
-  }
-
-  def edgeToJson(edge: Edge, score: Double, q: Query, queryParam: QueryParam): Map[String, JsValue] = {
-    val kvs = edgeToJsonInner(edge, score, q, queryParam)
-    if (kvs.nonEmpty && q.returnTree) kvs + ("parents" -> Json.toJson(edgeParent(edge.parentEdges, q, queryParam)))
-    else kvs
-  }
-
-  def vertexToJson(vertex: Vertex): Option[JsObject] = {
-    val serviceColumn = ServiceColumn.findById(vertex.id.colId)
-
-    for {
-      id <- innerValToJsValue(vertex.innerId, serviceColumn.columnType)
-    } yield {
-      Json.obj("serviceName" -> serviceColumn.service.serviceName,
-        "columnName" -> serviceColumn.columnName,
-        "id" -> id, "props" -> propsToJson(vertex),
-        "timestamp" -> vertex.ts,
-        //        "belongsTo" -> vertex.belongLabelIds)
-        "belongsTo" -> vertex.belongLabelIds.flatMap(Label.findByIdOpt(_).map(_.label)))
+      withImpressionId(queryOption, results.size, degrees, results)
     }
   }
-
-  private def keysToName(seqsToNames: Map[Int, String], props: Map[Int, InnerValLike]) = {
-    for {
-      (seq, value) <- props
-      name <- seqsToNames.get(seq)
-    } yield (name, value)
-  }
-
-  private def propsToJson(vertex: Vertex) = {
-    val serviceColumn = vertex.serviceColumn
-    val props = for {
-      (propKey, innerVal) <- vertex.props
-      columnMeta <- ColumnMeta.findByIdAndSeq(serviceColumn.id.get, propKey.toByte, useCache = true)
-      jsValue <- innerValToJsValue(innerVal, columnMeta.dataType)
-    } yield {
-      (columnMeta.name -> jsValue)
-    }
-    props.toMap
-  }
-
-  def propsToJson(edge: Edge) = {
-    for {
-      (seq, v) <- edge.propsWithTs if LabelMeta.isValidSeq(seq)
-      metaProp <- edge.label.metaPropsMap.get(seq)
-      jsValue <- innerValToJsValue(v.innerVal, metaProp.dataType)
-    } yield {
-      (metaProp.name, jsValue)
-    }
-  }
-
-  def summarizeWithListExclude(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]): JsObject = {
-    val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap
-
-
-    val groupedEdgesWithRank = (for {
-      queryRequestWithResult <- queryRequestWithResultLs
-      (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
-      edgeWithScore <- queryResult.edgeWithScoreLs
-      (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
-      if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, queryRequest.query.filterOutFields))
-    } yield {
-      (edge, score)
-    }).groupBy { case (edge, score) =>
-      (edge.label.tgtColumn, edge.label.srcColumn, edge.tgtVertex.innerId)
-    }
-
-    val jsons = for {
-      ((tgtColumn, srcColumn, target), edgesAndRanks) <- groupedEdgesWithRank
-      (edges, ranks) = edgesAndRanks.groupBy(x => x._1.srcVertex).map(_._2.head).unzip
-      tgtId <- innerValToJsValue(target, tgtColumn.columnType)
-    } yield {
-      Json.obj(tgtColumn.columnName -> tgtId,
-        s"${srcColumn.columnName}s" ->
-          edges.flatMap(edge => innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType)), "scoreSum" -> ranks.sum)
-    }
-    val sortedJsons = jsons.toList.sortBy { jsObj => (jsObj \ "scoreSum").as[Double] }.reverse
-    if (queryRequestWithResultLs.isEmpty) {
-      Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons)
-    } else {
-      Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons,
-        "impressionId" -> queryRequestWithResultLs.head.queryRequest.query.impressionId())
-    }
-
-  }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
index 28d78a9..ef40688 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -21,6 +21,7 @@ package org.apache.s2graph.core
 
 import com.google.common.hash.Hashing
 import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
 import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
 import org.apache.s2graph.core.parsers.{Where, WhereParser}
 import org.apache.s2graph.core.types.{HBaseSerializable, InnerVal, InnerValLike, LabelWithDirection}
@@ -51,6 +52,12 @@ object Query {
   }
 }
 
+object GroupBy {
+  val Empty = GroupBy()
+}
+case class GroupBy(keys: Seq[String] = Nil,
+                   limit: Int = Int.MaxValue)
+
 case class MultiQuery(queries: Seq[Query],
                       weights: Seq[Double],
                       queryOption: QueryOption,
@@ -58,7 +65,7 @@ case class MultiQuery(queries: Seq[Query],
 
 case class QueryOption(removeCycle: Boolean = false,
                        selectColumns: Seq[String] = Seq.empty,
-                       groupByColumns: Seq[String] = Seq.empty,
+                       groupBy: GroupBy = GroupBy.Empty,
                        orderByColumns: Seq[(String, Boolean)] = Seq.empty,
                        filterOutQuery: Option[Query] = None,
                        filterOutFields: Seq[String] = Seq(LabelMeta.to.name),
@@ -67,8 +74,11 @@ case class QueryOption(removeCycle: Boolean = false,
                        limitOpt: Option[Int] = None,
                        returnAgg: Boolean = true,
                        scoreThreshold: Double = Double.MinValue,
-                       returnDegree: Boolean = true)
-
+                       returnDegree: Boolean = true,
+                       impIdOpt: Option[String] = None) {
+  val orderByKeys = orderByColumns.map(_._1)
+  val ascendingVals = orderByColumns.map(_._2)
+}
 
 case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex],
                  steps: IndexedSeq[Step] = Vector.empty[Step],
@@ -77,8 +87,7 @@ case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex],
 
   val removeCycle = queryOption.removeCycle
   val selectColumns = queryOption.selectColumns
-//  val groupBy = queryOption.groupBy
-  val groupByColumns = queryOption.groupByColumns
+  val groupBy = queryOption.groupBy
   val orderByColumns = queryOption.orderByColumns
   val filterOutQuery = queryOption.filterOutQuery
   val filterOutFields = queryOption.filterOutFields
@@ -90,7 +99,7 @@ case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex],
 
   def cacheKeyBytes: Array[Byte] = {
     val selectBytes = Bytes.toBytes(queryOption.selectColumns.toString)
-    val groupBytes = Bytes.toBytes(queryOption.groupByColumns.toString)
+    val groupBytes = Bytes.toBytes(queryOption.groupBy.keys.toString)
     val orderByBytes = Bytes.toBytes(queryOption.orderByColumns.toString)
     val filterOutBytes = queryOption.filterOutQuery.map(_.cacheKeyBytes).getOrElse(Array.empty[Byte])
     val returnTreeBytes = Bytes.toBytes(queryOption.returnTree)
@@ -279,11 +288,23 @@ case class RankParam(labelId: Int, var keySeqAndWeights: Seq[(Byte, Double)] = S
     bytes
   }
 }
+case class S2Request(labelName: String,
+                     direction: String = "out",
+                     ts: Long = System.currentTimeMillis(),
+                     options: Map[String, Any] = Map.empty) {
+  val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
+  val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
+  val labelWithDir = LabelWithDirection(label.id.get, dir)
+  //TODO: need to merge options into queryParam.
+  val queryParam = QueryParam(labelWithDir, ts)
 
+}
 object QueryParam {
   lazy val Empty = QueryParam(LabelWithDirection(0, 0))
   lazy val DefaultThreshold = Double.MinValue
   val Delimiter = ","
+  val maxMetaByte = (-1).toByte
+  val fillArray = Array.fill(100)(maxMetaByte)
 }
 
 case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System.currentTimeMillis()) {
@@ -381,13 +402,13 @@ case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System
   }
 
   def limit(offset: Int, limit: Int): QueryParam = {
-    /* since degree info is located on first always */
-    if (offset == 0 && this.columnRangeFilter == null) {
-      this.limit = limit + 1
-      this.offset = offset
-    } else {
-      this.limit = limit
-      this.offset = offset + 1
+    /** since degree info is located on first always */
+    this.limit = limit
+    this.offset = offset
+
+    if (this.columnRangeFilter == null) {
+      if (offset == 0) this.limit = limit + 1
+      else this.offset = offset + 1
     }
     //    this.columnPaginationFilter = new ColumnPaginationFilter(this.limit, this.offset)
     this
@@ -400,26 +421,24 @@ case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System
     }
   }
 
-  def interval(from: Seq[(Byte, InnerValLike)], to: Seq[(Byte, InnerValLike)]): QueryParam = {
-    //    val len = label.orderTypes.size.toByte
-    //    val len = label.extraIndicesMap(labelOrderSeq).sortKeyTypes.size.toByte
-    //    logger.error(s"indicesMap: ${label.indicesMap(labelOrderSeq)}")
-    val len = label.indicesMap(labelOrderSeq).sortKeyTypes.size.toByte
+  def paddingInterval(len: Byte, from: Seq[(Byte, InnerValLike)], to: Seq[(Byte, InnerValLike)]) = {
+    val fromVal = Bytes.add(propsToBytes(from), QueryParam.fillArray)
+    val toVal = propsToBytes(to)
 
-    val minMetaByte = InnerVal.minMetaByte
-    //    val maxMetaByte = InnerVal.maxMetaByte
-    val maxMetaByte = -1.toByte
-    val toVal = Bytes.add(propsToBytes(to), Array.fill(1)(minMetaByte))
-    //FIXME
-    val fromVal = Bytes.add(propsToBytes(from), Array.fill(10)(maxMetaByte))
     toVal(0) = len
     fromVal(0) = len
-    val maxBytes = fromVal
-    val minBytes = toVal
+
+    val minMax = (toVal, fromVal) // inverted
+    minMax
+  }
+
+  def interval(from: Seq[(Byte, InnerValLike)], to: Seq[(Byte, InnerValLike)]): QueryParam = {
+    val len = label.indicesMap(labelOrderSeq).sortKeyTypes.size.toByte
+    val (minBytes, maxBytes) = paddingInterval(len, from, to)
+
     this.columnRangeFilterMaxBytes = maxBytes
     this.columnRangeFilterMinBytes = minBytes
-    val rangeFilter = new ColumnRangeFilter(minBytes, true, maxBytes, true)
-    this.columnRangeFilter = rangeFilter
+    this.columnRangeFilter = new ColumnRangeFilter(minBytes, true, maxBytes, true)
     this
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
index 5343659..5b2622f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
@@ -21,31 +21,42 @@ package org.apache.s2graph.core
 
 import org.apache.s2graph.core.mysqls.LabelMeta
 import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs}
+import org.apache.s2graph.core.utils.logger
 
-import scala.collection.Seq
+import scala.collection.mutable.ListBuffer
+import scala.collection.{Seq, mutable}
 
 object QueryResult {
-  def fromVertices(query: Query): Seq[QueryRequestWithResult] = {
+  def fromVertices(query: Query): StepInnerResult = {
     if (query.steps.isEmpty || query.steps.head.queryParams.isEmpty) {
-      Seq.empty
+      StepInnerResult.Empty
     } else {
       val queryParam = query.steps.head.queryParams.head
       val label = queryParam.label
       val currentTs = System.currentTimeMillis()
       val propsWithTs = Map(LabelMeta.timeStampSeq ->
         InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), currentTs))
-      for {
+      val edgeWithScoreLs = for {
         vertex <- query.vertices
       } yield {
-        val edge = Edge(vertex, vertex, queryParam.labelWithDir, propsWithTs = propsWithTs)
-        val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore)
-        QueryRequestWithResult(QueryRequest(query, -1, vertex, queryParam),
-          QueryResult(edgeWithScoreLs = Seq(edgeWithScore)))
-      }
+          val edge = Edge(vertex, vertex, queryParam.labelWithDir, propsWithTs = propsWithTs)
+          val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore)
+          edgeWithScore
+        }
+      StepInnerResult(edgesWithScoreLs = edgeWithScoreLs, Nil, false)
     }
   }
 }
-case class QueryRequestWithResult(queryRequest: QueryRequest, queryResult: QueryResult)
+/** inner traverse */
+object StepInnerResult {
+  val Failure = StepInnerResult(Nil, Nil, true)
+  val Empty = StepInnerResult(Nil, Nil, false)
+}
+case class StepInnerResult(edgesWithScoreLs: Seq[EdgeWithScore],
+                           degreeEdges: Seq[EdgeWithScore],
+                           isFailure: Boolean = false) {
+  val isEmpty = edgesWithScoreLs.isEmpty
+}
 
 case class QueryRequest(query: Query,
                         stepIdx: Int,
@@ -59,3 +70,206 @@ case class QueryResult(edgeWithScoreLs: Seq[EdgeWithScore] = Nil,
                        isFailure: Boolean = false)
 
 case class EdgeWithScore(edge: Edge, score: Double)
+
+
+
+
+
+/** result */
+
+object StepResult {
+
+  type Values = Seq[S2EdgeWithScore]
+  type GroupByKey = Seq[Option[Any]]
+  val EmptyOrderByValues = (None, None, None, None)
+  val Empty = StepResult(Nil, Nil, Nil)
+
+
+  def mergeOrdered(left: StepResult.Values,
+                   right: StepResult.Values,
+                   queryOption: QueryOption): (Double, StepResult.Values) = {
+    val merged = (left ++ right)
+    val scoreSum = merged.foldLeft(0.0) { case (prev, current) => prev + current.score }
+    if (scoreSum < queryOption.scoreThreshold) (0.0, Nil)
+    else {
+      val ordered = orderBy(queryOption, merged)
+      val filtered = ordered.take(queryOption.groupBy.limit)
+      val newScoreSum = filtered.foldLeft(0.0) { case (prev, current) => prev + current.score }
+      (newScoreSum, filtered)
+    }
+  }
+
+  def orderBy(queryOption: QueryOption, notOrdered: Values): Values = {
+    import OrderingUtil._
+
+    if (queryOption.withScore && queryOption.orderByColumns.nonEmpty) {
+      notOrdered.sortBy(_.orderByValues)(TupleMultiOrdering[Any](queryOption.ascendingVals))
+    } else {
+      notOrdered
+    }
+  }
+  def toOrderByValues(s2Edge: Edge,
+                      score: Double,
+                      orderByKeys: Seq[String]): (Any, Any, Any, Any) = {
+    def toValue(propertyKey: String): Any = {
+      propertyKey match {
+        case "score" => score
+        case "timestamp" | "_timestamp" => s2Edge.ts
+        case _ => s2Edge.properties.get(propertyKey)
+      }
+    }
+    if (orderByKeys.isEmpty) (None, None, None, None)
+    else {
+      orderByKeys.length match {
+        case 1 =>
+          (toValue(orderByKeys(0)), None, None, None)
+        case 2 =>
+          (toValue(orderByKeys(0)), toValue(orderByKeys(1)), None, None)
+        case 3 =>
+          (toValue(orderByKeys(0)), toValue(orderByKeys(1)), toValue(orderByKeys(2)), None)
+        case _ =>
+          (toValue(orderByKeys(0)), toValue(orderByKeys(1)), toValue(orderByKeys(2)), toValue(orderByKeys(3)))
+      }
+    }
+  }
+  /**
+   * merge multiple StepResult into one StepResult.
+   * @param queryOption
+   * @param multiStepResults
+   * @return
+   */
+  def merges(queryOption: QueryOption,
+             multiStepResults: Seq[StepResult],
+             weights: Seq[Double] = Nil): StepResult = {
+    val degrees = multiStepResults.flatMap(_.degreeEdges)
+    val ls = new mutable.ListBuffer[S2EdgeWithScore]()
+    val agg= new mutable.HashMap[GroupByKey, ListBuffer[S2EdgeWithScore]]()
+    val sums = new mutable.HashMap[GroupByKey, Double]()
+
+    for {
+      (weight, eachStepResult) <- weights.zip(multiStepResults)
+      (ordered, grouped) = (eachStepResult.results, eachStepResult.grouped)
+    } {
+      ordered.foreach { t =>
+        val newScore = t.score * weight
+        ls += t.copy(score = newScore)
+      }
+
+      // process each query's stepResult's grouped
+      for {
+        (groupByKey, (scoreSum, values)) <- grouped
+      } {
+        val buffer = agg.getOrElseUpdate(groupByKey, ListBuffer.empty[S2EdgeWithScore])
+        var scoreSum = 0.0
+        values.foreach { t =>
+          val newScore = t.score * weight
+          buffer += t.copy(score = newScore)
+          scoreSum += newScore
+        }
+        sums += (groupByKey -> scoreSum)
+      }
+    }
+
+    // process global groupBy
+    if (queryOption.groupBy.keys.nonEmpty) {
+      for {
+        s2EdgeWithScore <- ls
+        groupByKey = s2EdgeWithScore.s2Edge.selectValues(queryOption.groupBy.keys)
+      } {
+        val buffer = agg.getOrElseUpdate(groupByKey, ListBuffer.empty[S2EdgeWithScore])
+        buffer += s2EdgeWithScore
+        val newScore = sums.getOrElse(groupByKey, 0.0) + s2EdgeWithScore.score
+        sums += (groupByKey -> newScore)
+      }
+    }
+
+
+    val ordered = orderBy(queryOption, ls)
+    val grouped = for {
+      (groupByKey, scoreSum) <- sums.toSeq.sortBy(_._2 * -1)
+      aggregated = agg(groupByKey) if aggregated.nonEmpty
+    } yield groupByKey -> (scoreSum, aggregated)
+
+    StepResult(results = ordered, grouped = grouped, degrees)
+  }
+
+  //TODO: Optimize this.
+  def filterOut(graph: Graph,
+                queryOption: QueryOption,
+                baseStepResult: StepResult,
+                filterOutStepInnerResult: StepInnerResult): StepResult = {
+
+    val fields = if (queryOption.filterOutFields.isEmpty) Seq("to") else Seq("to")
+    //    else queryOption.filterOutFields
+    val filterOutSet = filterOutStepInnerResult.edgesWithScoreLs.map { t =>
+      t.edge.selectValues(fields)
+    }.toSet
+
+    val filteredResults = baseStepResult.results.filter { t =>
+      val filterOutKey = t.s2Edge.selectValues(fields)
+      !filterOutSet.contains(filterOutKey)
+    }
+
+    val grouped = for {
+      (key, (scoreSum, values)) <- baseStepResult.grouped
+      (out, in) = values.partition(v => filterOutSet.contains(v.s2Edge.selectValues(fields)))
+      newScoreSum = scoreSum - out.foldLeft(0.0) { case (prev, current) => prev + current.score } if in.nonEmpty
+    } yield key -> (newScoreSum, in)
+
+
+    StepResult(results = filteredResults, grouped = grouped, baseStepResult.degreeEdges)
+  }
+  def apply(graph: Graph,
+            queryOption: QueryOption,
+            stepInnerResult: StepInnerResult): StepResult = {
+        logger.debug(s"[BeforePostProcess]: ${stepInnerResult.edgesWithScoreLs.size}")
+
+    val results = for {
+      edgeWithScore <- stepInnerResult.edgesWithScoreLs
+    } yield {
+        val edge = edgeWithScore.edge
+        val orderByValues =
+          if (queryOption.orderByColumns.isEmpty) (edgeWithScore.score, None, None, None)
+          else toOrderByValues(edge, edgeWithScore.score, queryOption.orderByKeys)
+
+        S2EdgeWithScore(edge, edgeWithScore.score, orderByValues, edgeWithScore.edge.parentEdges)
+      }
+    /** ordered flatten result */
+    val ordered = orderBy(queryOption, results)
+
+    /** ordered grouped result */
+    val grouped =
+      if (queryOption.groupBy.keys.isEmpty) Nil
+      else {
+        val agg = new mutable.HashMap[GroupByKey, (Double, Values)]()
+        results.groupBy { s2EdgeWithScore =>
+          s2EdgeWithScore.s2Edge.selectValues(queryOption.groupBy.keys, useToString = true)
+        }.map { case (k, ls) =>
+          val (scoreSum, merged) = mergeOrdered(ls, Nil, queryOption)
+          /**
+           * watch out here. by calling toString on Any, we lose type information which will be used
+           * later for toJson.
+           */
+          if (merged.nonEmpty) {
+            val newKey = merged.head.s2Edge.selectValues(queryOption.groupBy.keys, useToString = false)
+            agg += (newKey -> (scoreSum, merged))
+          }
+        }
+        agg.toSeq.sortBy(_._2._1 * -1)
+      }
+
+    val degrees = stepInnerResult.degreeEdges.map(t => S2EdgeWithScore(t.edge, t.score))
+    StepResult(results = ordered, grouped = grouped, degreeEdges = degrees)
+  }
+}
+
+case class S2EdgeWithScore(s2Edge: Edge,
+                           score: Double,
+                           orderByValues: (Any, Any, Any, Any) = StepResult.EmptyOrderByValues,
+                           parentEdges: Seq[EdgeWithScore] = Nil)
+
+case class StepResult(results: StepResult.Values,
+                      grouped: Seq[(StepResult.GroupByKey, (Double, StepResult.Values))],
+                      degreeEdges: StepResult.Values) {
+  val isEmpty = results.isEmpty
+}
\ No newline at end of file