You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/11/16 16:01:23 UTC

[2/3] incubator-s2graph git commit: [S2GRAPH-121]: Create `Result` class to hold traverse result edges.

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
index ede1127..0377bd8 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
@@ -19,10 +19,53 @@
 
 package org.apache.s2graph.core
 
+import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core.mysqls.{ColumnMeta, Service, ServiceColumn}
 import org.apache.s2graph.core.types.{InnerVal, InnerValLike, SourceVertexId, VertexId}
 import play.api.libs.json.Json
-
+//
+//object S2Vertex {
+//  def apply(graph: Graph, vertex: Vertex): S2Vertex = {
+//    S2Vertex(graph,
+//      vertex.serviceName,
+//      vertex.serviceColumn.columnName,
+//      vertex.innerIdVal,
+//      vertex.serviceColumn.innerValsToProps(vertex.props),
+//      vertex.ts,
+//      GraphUtil.fromOp(vertex.op)
+//    )
+//  }
+//}
+//
+//case class S2Vertex(graph: Graph,
+//                    serviceName: String,
+//                    columnName: String,
+//                    id: Any,
+//                    props: Map[String, Any] = Map.empty,
+//                    ts: Long = System.currentTimeMillis(),
+//                    operation: String = "insert") extends GraphElement {
+//  lazy val vertex = {
+//    val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found."))
+//    val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found."))
+//    val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
+//
+//    val srcVertexId = VertexId(column.id.get, toInnerVal(id.toString, column.columnType, column.schemaVersion))
+//    val propsInner = column.propsToInnerVals(props) ++
+//      Map(ColumnMeta.timeStampSeq.toInt -> InnerVal.withLong(ts, column.schemaVersion))
+//
+//    Vertex(srcVertexId, ts, propsInner, op)
+//  }
+//
+//  val uniqueId = (serviceName, columnName, id)
+//
+//  override def isAsync: Boolean = vertex.isAsync
+//
+//  override def toLogString(): String = vertex.toLogString()
+//
+//  override def queueKey: String = vertex.queueKey
+//
+//  override def queuePartitionKey: String = vertex.queuePartitionKey
+//}
 case class Vertex(id: VertexId,
                   ts: Long = System.currentTimeMillis(),
                   props: Map[Int, InnerValLike] = Map.empty[Int, InnerValLike],
@@ -31,10 +74,19 @@ case class Vertex(id: VertexId,
 
   val innerId = id.innerId
 
+  val innerIdVal = innerId.value
+
+  lazy val properties = for {
+    (k, v) <- props
+    meta <- serviceColumn.metasMap.get(k)
+  } yield meta.name -> v.value
+
   def schemaVer = serviceColumn.schemaVersion
 
   def serviceColumn = ServiceColumn.findById(id.colId)
 
+  def columnName = serviceColumn.columnName
+
   def service = Service.findById(serviceColumn.serviceId)
 
   lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, service.hTableName)
@@ -114,7 +166,21 @@ object Vertex {
 
   def isLabelId(propKey: Int): Boolean = propKey > Byte.MaxValue
 
-  //  val emptyVertex = Vertex(new CompositeId(CompositeId.defaultColId, CompositeId.defaultInnerId, false, true),
-  //    System.currentTimeMillis())
-  def fromString(s: String): Option[Vertex] = Graph.toVertex(s)
+  def toVertex(serviceName: String,
+            columnName: String,
+            id: Any,
+            props: Map[String, Any] = Map.empty,
+            ts: Long = System.currentTimeMillis(),
+            operation: String = "insert"): Vertex = {
+
+    val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found."))
+    val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found."))
+    val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
+
+    val srcVertexId = VertexId(column.id.get, toInnerVal(id.toString, column.columnType, column.schemaVersion))
+    val propsInner = column.propsToInnerVals(props) ++
+      Map(ColumnMeta.timeStampSeq.toInt -> InnerVal.withLong(ts, column.schemaVersion))
+
+    new Vertex(srcVertexId, ts, propsInner, op)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
index 9bd172d..cd825f4 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
@@ -25,7 +25,8 @@ import scalikejdbc._
 import scala.util.Random
 
 object Experiment extends Model[Experiment] {
-  val impressionKey = "S2-Impression-Id"
+  val ImpressionKey = "S2-Impression-Id"
+  val ImpressionId = "Impression-Id"
 
   def apply(rs: WrappedResultSet): Experiment = {
     Experiment(rs.intOpt("id"),

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
index 2734211..f7318f6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
@@ -19,13 +19,15 @@
 
 package org.apache.s2graph.core.mysqls
 
+import java.util.Calendar
 
 import org.apache.s2graph.core.GraphExceptions.ModelNotFoundException
 import org.apache.s2graph.core.GraphUtil
+import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
+import org.apache.s2graph.core.types.{InnerValLike, InnerValLikeWithTs}
 import org.apache.s2graph.core.utils.logger
-import org.apache.s2graph.core.JSONParser._
-import play.api.libs.json._
+import play.api.libs.json.{JsObject, JsValue, Json}
 import scalikejdbc._
 
 object Label extends Model[Label] {
@@ -48,6 +50,7 @@ object Label extends Model[Label] {
     Label.delete(id.get)
   }
 
+
   def findByName(labelName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Label] = {
     val cacheKey = "label=" + labelName
     lazy val labelOpt =
@@ -292,11 +295,16 @@ case class Label(id: Option[Int], label: String,
   lazy val srcColumn = ServiceColumn.find(srcServiceId, srcColumnName).getOrElse(throw ModelNotFoundException("Source column not found"))
   lazy val tgtColumn = ServiceColumn.find(tgtServiceId, tgtColumnName).getOrElse(throw ModelNotFoundException("Target column not found"))
 
-  lazy val direction = if (isDirected) "out" else "undirected"
   lazy val defaultIndex = LabelIndex.findByLabelIdAndSeq(id.get, LabelIndex.DefaultSeq)
 
   //TODO: Make sure this is correct
+
+//  lazy val metas = metas(useCache = true)
   lazy val indices = LabelIndex.findByLabelIdAll(id.get, useCache = true)
+  lazy val labelMetas = LabelMeta.findAllByLabelId(id.get, useCache = true)
+  lazy val labelMetaSet = labelMetas.toSet
+  lazy val labelMetaMap = (labelMetas ++ LabelMeta.reservedMetas).map(m => m.seq -> m).toMap
+
   lazy val indicesMap = indices.map(idx => (idx.seq, idx)) toMap
   lazy val indexSeqsMap = indices.map(idx => (idx.metaSeqs, idx)) toMap
   lazy val indexNameMap = indices.map(idx => (idx.name, idx)) toMap
@@ -327,14 +335,37 @@ case class Label(id: Option[Int], label: String,
     jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType)
   } yield prop.name -> jsValue).toMap
 
+  lazy val metaPropsDefaultMapInnerString = (for {
+    prop <- metaPropsInner if LabelMeta.isValidSeq(prop.seq)
+    innerVal = InnerValLikeWithTs(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), System.currentTimeMillis())
+  } yield prop.name -> innerVal).toMap
+
   lazy val metaPropsDefaultMapInner = (for {
     prop <- metaPropsInner if LabelMeta.isValidSeq(prop.seq)
+    innerVal = InnerValLikeWithTs(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), System.currentTimeMillis())
+  } yield prop -> innerVal).toMap
+  lazy val metaPropsDefaultMapInnerSeq = metaPropsDefaultMapInner.toSeq
+  lazy val metaPropsJsValueWithDefault = (for {
+    prop <- metaProps if LabelMeta.isValidSeq(prop.seq)
     jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType)
-  } yield prop.name -> jsValue).toMap
+  } yield prop -> jsValue).toMap
+//  lazy val extraOptions = Model.extraOptions(Option("""{
+//    "storage": {
+//      "s2graph.storage.backend": "rocks",
+//      "rocks.db.path": "/tmp/db"
+//    }
+//  }"""))
 
   lazy val extraOptions: Map[String, JsValue] = options match {
     case None => Map.empty
-    case Some(v) => Json.parse(v).asOpt[JsObject].map { obj => obj.fields.toMap }.getOrElse(Map.empty)
+    case Some(v) =>
+      try {
+        Json.parse(v).asOpt[JsObject].map { obj => obj.fields.toMap }.getOrElse(Map.empty)
+      } catch {
+        case e: Exception =>
+          logger.error(s"An error occurs while parsing the extra label option: ${label}", e)
+          Map.empty
+      }
   }
 
   def srcColumnWithDir(dir: Int) = {
@@ -386,5 +417,23 @@ case class Label(id: Option[Int], label: String,
   )
 
 
+  def propsToInnerValsWithTs(props: Map[String, Any],
+                             ts: Long = System.currentTimeMillis()): Map[Byte, InnerValLikeWithTs] = {
+    for {
+      (k, v) <- props
+      labelMeta <- metaPropsInvMap.get(k)
+      innerVal = toInnerVal(v.toString, labelMeta.dataType, schemaVersion)
+    } yield labelMeta.seq -> InnerValLikeWithTs(innerVal, ts)
+
+  }
+
+  def innerValsWithTsToProps(props: Map[Byte, InnerValLikeWithTs]): Map[String, Any] = {
+    for {
+      (k, v) <- props
+      labelMeta <- metaPropsMap.get(k)
+    } yield {
+      labelMeta.name -> v.innerVal.value
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
index b68fa79..6fceabc 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
@@ -24,6 +24,8 @@ package org.apache.s2graph.core.mysqls
  */
 
 import org.apache.s2graph.core.JSONParser
+import org.apache.s2graph.core.JSONParser._
+import org.apache.s2graph.core.types.{InnerValLikeWithTs, InnerValLike}
 import play.api.libs.json.Json
 import scalikejdbc._
 object ServiceColumn extends Model[ServiceColumn] {
@@ -89,13 +91,40 @@ object ServiceColumn extends Model[ServiceColumn] {
     })
   }
 }
-case class ServiceColumn(id: Option[Int], serviceId: Int, columnName: String, columnType: String, schemaVersion: String) {
+case class ServiceColumn(id: Option[Int], serviceId: Int, columnName: String, columnType: String, schemaVersion: String)  {
 
   lazy val service = Service.findById(serviceId)
   lazy val metas = ColumnMeta.findAllByColumn(id.get)
+  lazy val metasMap = metas.map { meta => meta.seq.toInt -> meta } toMap
   lazy val metasInvMap = metas.map { meta => meta.name -> meta} toMap
   lazy val metaNamesMap = (ColumnMeta.lastModifiedAtColumn :: metas).map(x => (x.seq.toInt, x.name)) toMap
   lazy val toJson = Json.obj("serviceName" -> service.serviceName, "columnName" -> columnName, "columnType" -> columnType)
 
+  def propsToInnerVals(props: Map[String, Any]): Map[Int, InnerValLike] = {
+    for {
+      (k, v) <- props
+      labelMeta <- metasInvMap.get(k)
+      innerVal = toInnerVal(v.toString, labelMeta.dataType, schemaVersion)
+    } yield labelMeta.seq.toInt -> innerVal
+  }
+
+  def innerValsToProps(props: Map[Int, InnerValLike]): Map[String, Any] = {
+    for {
+      (k, v) <- props
+      columnMeta <- metasMap.get(k)
+    } yield {
+      columnMeta.name -> v.value
+    }
+  }
+
+  def innerValsWithTsToProps(props: Map[Int, InnerValLikeWithTs]): Map[String, Any] = {
+    for {
+      (k, v) <- props
+      columnMeta <- metasMap.get(k)
+    } yield {
+      columnMeta.name -> v.innerVal.value
+    }
+  }
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
index 5920f3c..1c93667 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
@@ -22,7 +22,7 @@ package org.apache.s2graph.core.parsers
 import org.apache.s2graph.core.GraphExceptions.WhereParserException
 import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
 import org.apache.s2graph.core.types.InnerValLike
-import org.apache.s2graph.core.{Edge, GraphExceptions, JSONParser}
+import org.apache.s2graph.core.Edge
 import org.apache.s2graph.core.JSONParser._
 import scala.annotation.tailrec
 import scala.util.Try

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index 52ee50d..d77ac7d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -36,9 +36,11 @@ import scala.util.{Failure, Success, Try}
 object TemplateHelper {
   val findVar = """\"?\$\{(.*?)\}\"?""".r
   val num = """(next_day|next_hour|next_week|now)?\s*(-?\s*[0-9]+)?\s*(hour|day|week)?""".r
+
   val hour = 60 * 60 * 1000L
   val day = hour * 24L
   val week = day * 7L
+
   def calculate(now: Long, n: Int, unit: String): Long = {
     val duration = unit match {
       case "hour" | "HOUR" => n * hour
@@ -72,12 +74,32 @@ object TemplateHelper {
   }
 }
 
-class RequestParser(config: Config) {
+object RequestParser {
+  type ExperimentParam = (JsObject, String, String, String, Option[String])
+  val defaultLimit = 100
+
+  def toJsValues(jsValue: JsValue): List[JsValue] = {
+    jsValue match {
+      case obj: JsObject => List(obj)
+      case arr: JsArray => arr.as[List[JsValue]]
+      case _ => List.empty[JsValue]
+    }
+  }
+
+  def jsToStr(js: JsValue): String = js match {
+    case JsString(s) => s
+    case _ => js.toString()
+  }
+
+}
+
+class RequestParser(graph: Graph) {
 
   import Management.JsonModel._
+  import RequestParser._
 
-  val hardLimit = 100000
-  val defaultLimit = 100
+  val config = graph.config
+  val hardLimit = config.getInt("query.hardlimit")
   val maxLimit = Int.MaxValue - 1
   val DefaultRpcTimeout = config.getInt("hbase.rpc.timeout")
   val DefaultMaxAttempt = config.getInt("hbase.client.retries.number")
@@ -106,13 +128,11 @@ class RequestParser(config: Config) {
         (labelOrderType.seq, value)
       }
     }
+
     ret
   }
 
-  def extractInterval(label: Label, _jsValue: JsValue) = {
-    val replaced = TemplateHelper.replaceVariable(System.currentTimeMillis(), _jsValue.toString())
-    val jsValue = Json.parse(replaced)
-
+  def extractInterval(label: Label, jsValue: JsValue) = {
     def extractKv(js: JsValue) = js match {
       case JsObject(map) => map.toSeq
       case JsArray(arr) => arr.flatMap {
@@ -135,15 +155,21 @@ class RequestParser(config: Config) {
     ret
   }
 
-  def extractDuration(label: Label, _jsValue: JsValue) = {
-    val replaced = TemplateHelper.replaceVariable(System.currentTimeMillis(), _jsValue.toString())
-    val jsValue = Json.parse(replaced)
-
+  def extractDuration(label: Label, jsValue: JsValue) = {
     for {
       js <- parseOption[JsObject](jsValue, "duration")
     } yield {
-      val minTs = parseOption[Long](js, "from").getOrElse(Long.MaxValue)
-      val maxTs = parseOption[Long](js, "to").getOrElse(Long.MinValue)
+      val minTs = (js \ "from").get match {
+        case JsString(s) => TemplateHelper.replaceVariable(System.currentTimeMillis(), s).toLong
+        case JsNumber(n) => n.toLong
+        case _ => Long.MinValue
+      }
+
+      val maxTs = (js \ "to").get match {
+        case JsString(s) => TemplateHelper.replaceVariable(System.currentTimeMillis(), s).toLong
+        case JsNumber(n) => n.toLong
+        case _ => Long.MaxValue
+      }
 
       if (minTs > maxTs) {
         throw new BadQueryException("Duration error. Timestamp of From cannot be larger than To.")
@@ -167,21 +193,24 @@ class RequestParser(config: Config) {
     }
     ret.map(_.toMap).getOrElse(Map.empty[Byte, InnerValLike])
   }
-  
+
   def extractWhere(label: Label, whereClauseOpt: Option[String]): Try[Where] = {
     whereClauseOpt match {
       case None => Success(WhereParser.success)
-      case Some(_where) =>
-        val where = TemplateHelper.replaceVariable(System.currentTimeMillis(), _where)
+      case Some(where) =>
         val whereParserKey = s"${label.label}_${where}"
+
         parserCache.get(whereParserKey, new Callable[Try[Where]] {
           override def call(): Try[Where] = {
-            WhereParser(label).parse(where) match {
+            val _where = TemplateHelper.replaceVariable(System.currentTimeMillis(), where)
+
+            WhereParser(label).parse(_where) match {
               case s@Success(_) => s
               case Failure(ex) => throw BadQueryException(ex.getMessage, ex)
             }
           }
         })
+
     }
   }
 
@@ -194,28 +223,38 @@ class RequestParser(config: Config) {
     } yield {
       Vertex(SourceVertexId(serviceColumn.id.get, innerId), System.currentTimeMillis())
     }
-    vertices.toSeq
+
+    vertices
   }
 
-  def toMultiQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): MultiQuery = {
+  def toMultiQuery(jsValue: JsValue, impIdOpt: Option[String]): MultiQuery = {
     val queries = for {
       queryJson <- (jsValue \ "queries").asOpt[Seq[JsValue]].getOrElse(Seq.empty)
     } yield {
-      toQuery(queryJson, isEdgeQuery)
+      toQuery(queryJson, impIdOpt = impIdOpt)
     }
     val weights = (jsValue \ "weights").asOpt[Seq[Double]].getOrElse(queries.map(_ => 1.0))
-    MultiQuery(queries = queries, weights = weights,
-      queryOption = toQueryOption(jsValue), jsonQuery = jsValue)
+    MultiQuery(queries = queries, weights = weights, queryOption = toQueryOption(jsValue, impIdOpt))
   }
 
-  def toQueryOption(jsValue: JsValue): QueryOption = {
+  def toQueryOption(jsValue: JsValue, impIdOpt: Option[String]): QueryOption = {
     val filterOutFields = (jsValue \ "filterOutFields").asOpt[List[String]].getOrElse(List(LabelMeta.to.name))
-    val filterOutQuery = (jsValue \ "filterOut").asOpt[JsValue].map { v => toQuery(v) }.map { q =>
+    val filterOutQuery = (jsValue \ "filterOut").asOpt[JsValue].map { v => toQuery(v, impIdOpt = impIdOpt) }.map { q =>
       q.copy(queryOption = q.queryOption.copy(filterOutFields = filterOutFields))
     }
     val removeCycle = (jsValue \ "removeCycle").asOpt[Boolean].getOrElse(true)
     val selectColumns = (jsValue \ "select").asOpt[List[String]].getOrElse(List.empty)
-    val groupByColumns = (jsValue \ "groupBy").asOpt[List[String]].getOrElse(List.empty)
+//    val groupByColumns = (jsValue \ "groupBy").asOpt[List[String]].getOrElse(List.empty)
+    val groupBy = (jsValue \ "groupBy").asOpt[JsValue].getOrElse(JsNull) match {
+      case obj: JsObject =>
+        val keys = (obj \ "key").asOpt[Seq[String]].getOrElse(Nil)
+        val groupByLimit = (obj \ "limit").asOpt[Int].getOrElse(hardLimit)
+        GroupBy(keys, groupByLimit)
+      case arr: JsArray =>
+        val keys = arr.asOpt[Seq[String]].getOrElse(Nil)
+        GroupBy(keys)
+      case _ => GroupBy.Empty
+    }
     val orderByColumns: List[(String, Boolean)] = (jsValue \ "orderBy").asOpt[List[JsObject]].map { jsLs =>
       for {
         js <- jsLs
@@ -238,7 +277,7 @@ class RequestParser(config: Config) {
 
     QueryOption(removeCycle = removeCycle,
       selectColumns = selectColumns,
-      groupByColumns = groupByColumns,
+      groupBy = groupBy,
       orderByColumns = orderByColumns,
       filterOutQuery = filterOutQuery,
       filterOutFields = filterOutFields,
@@ -247,10 +286,12 @@ class RequestParser(config: Config) {
       limitOpt = limitOpt,
       returnAgg = returnAgg,
       scoreThreshold = scoreThreshold,
-      returnDegree = returnDegree
+      returnDegree = returnDegree,
+      impIdOpt = impIdOpt
     )
   }
-  def toQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): Query = {
+
+  def toQuery(jsValue: JsValue, impIdOpt: Option[String]): Query = {
     try {
       val vertices =
         (for {
@@ -274,7 +315,7 @@ class RequestParser(config: Config) {
       if (vertices.isEmpty) throw BadQueryException("srcVertices`s id is empty")
       val steps = parse[Vector[JsValue]](jsValue, "steps")
 
-      val queryOption = toQueryOption(jsValue)
+      val queryOption = toQueryOption(jsValue, impIdOpt)
 
       val querySteps =
         steps.zipWithIndex.map { case (step, stepIdx) =>
@@ -332,7 +373,7 @@ class RequestParser(config: Config) {
 
         }
 
-      val ret = Query(vertices, querySteps, queryOption, jsValue)
+      val ret = Query(vertices, querySteps, queryOption)
       //      logger.debug(ret.toString)
       ret
     } catch {
@@ -354,10 +395,8 @@ class RequestParser(config: Config) {
       val limit = {
         parseOption[Int](labelGroup, "limit") match {
           case None => defaultLimit
-          case Some(l) if l < 0 => maxLimit
-          case Some(l) if l >= 0 =>
-            val default = hardLimit
-            Math.min(l, default)
+          case Some(l) if l < 0 => hardLimit
+          case Some(l) if l >= 0 => Math.min(l, hardLimit)
         }
       }
       val offset = parseOption[Int](labelGroup, "offset").getOrElse(0)
@@ -405,7 +444,6 @@ class RequestParser(config: Config) {
       // FIXME: Order of command matter
       QueryParam(labelWithDir)
         .sample(sample)
-        .limit(offset, limit)
         .rank(RankParam(label.id.get, scoring))
         .exclude(exclude)
         .include(include)
@@ -413,6 +451,7 @@ class RequestParser(config: Config) {
         .has(hasFilter)
         .labelOrderSeq(indexSeq)
         .interval(interval)
+        .limit(offset, limit)
         .where(where)
         .duplicatePolicy(duplicate)
         .includeDegree(includeDegree)
@@ -450,21 +489,8 @@ class RequestParser(config: Config) {
     }
   }
 
-  def toJsValues(jsValue: JsValue): List[JsValue] = {
-    jsValue match {
-      case obj: JsObject => List(obj)
-      case arr: JsArray => arr.as[List[JsValue]]
-      case _ => List.empty[JsValue]
-    }
-  }
-
-  def jsToStr(js: JsValue): String = js match {
-    case JsString(s) => s
-    case _ => js.toString()
-  }
-
   def parseBulkFormat(str: String): Seq[(GraphElement, String)] = {
-    val edgeStrs = str.split("\\n")
+    val edgeStrs = str.split("\\n").filterNot(_.isEmpty)
     val elementsWithTsv = for {
       edgeStr <- edgeStrs
       str <- GraphUtil.parseString(edgeStr)
@@ -480,24 +506,23 @@ class RequestParser(config: Config) {
   }
 
   private def toEdgeWithTsv(jsValue: JsValue, operation: String): Seq[(Edge, String)] = {
-    val srcId = (jsValue \ "from").asOpt[JsValue].map(jsToStr)
-    val tgtId = (jsValue \ "to").asOpt[JsValue].map(jsToStr)
-    val srcIds = (jsValue \ "froms").asOpt[List[JsValue]].toList.flatMap(froms => froms.map(jsToStr)) ++ srcId
-    val tgtIds = (jsValue \ "tos").asOpt[List[JsValue]].toList.flatMap(tos => tos.map(jsToStr)) ++ tgtId
+    val srcIds = (jsValue \ "from").asOpt[JsValue].map(Seq(_)).getOrElse(Nil) ++ (jsValue \ "froms").asOpt[Seq[JsValue]].getOrElse(Nil)
+    val tgtIds = (jsValue \ "to").asOpt[JsValue].map(Seq(_)).getOrElse(Nil) ++ (jsValue \ "tos").asOpt[Seq[JsValue]].getOrElse(Nil)
 
     val label = parse[String](jsValue, "label")
     val timestamp = parse[Long](jsValue, "timestamp")
-    val direction = parseOption[String](jsValue, "direction").getOrElse("")
-    val props = (jsValue \ "props").asOpt[JsValue].getOrElse("{}")
+    val direction = parseOption[String](jsValue, "direction").getOrElse("out")
+    val propsJson = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj())
 
     for {
-      srcId <- srcIds
-      tgtId <- tgtIds
+      srcId <- srcIds.flatMap(jsValueToAny(_).toSeq)
+      tgtId <- tgtIds.flatMap(jsValueToAny(_).toSeq)
     } yield {
-      val edge = Management.toEdge(timestamp, operation, srcId, tgtId, label, direction, props.toString)
+      //      val edge = Management.toEdge(graph, timestamp, operation, srcId, tgtId, label, direction, fromJsonToProperties(propsJson))
+      val edge = Edge.toEdge(srcId, tgtId, label, direction, fromJsonToProperties(propsJson), ts = timestamp, operation = operation)
       val tsv = (jsValue \ "direction").asOpt[String] match {
-        case None => Seq(timestamp, operation, "e", srcId, tgtId, label, props).mkString("\t")
-        case Some(dir) => Seq(timestamp, operation, "e", srcId, tgtId, label, props, dir).mkString("\t")
+        case None => Seq(timestamp, operation, "e", srcId, tgtId, label, propsJson.toString).mkString("\t")
+        case Some(dir) => Seq(timestamp, operation, "e", srcId, tgtId, label, propsJson.toString, dir).mkString("\t")
       }
 
       (edge, tsv)
@@ -513,8 +538,8 @@ class RequestParser(config: Config) {
     val ts = parseOption[Long](jsValue, "timestamp").getOrElse(System.currentTimeMillis())
     val sName = if (serviceName.isEmpty) parse[String](jsValue, "serviceName") else serviceName.get
     val cName = if (columnName.isEmpty) parse[String](jsValue, "columnName") else columnName.get
-    val props = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj())
-    Management.toVertex(ts, operation, id.toString, sName, cName, props.toString)
+    val props = fromJsonToProperties((jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj()))
+    Vertex.toVertex(sName, cName, id.toString, props, ts, operation)
   }
 
   def toPropElements(jsObj: JsValue) = Try {
@@ -637,7 +662,7 @@ class RequestParser(config: Config) {
 
   def toDeleteParam(json: JsValue) = {
     val labelName = (json \ "label").as[String]
-    val labels = Label.findByName(labelName).map { l => Seq(l) }.getOrElse(Nil).filterNot(_.isAsync)
+    val labels = Label.findByName(labelName).map { l => Seq(l) }.getOrElse(Nil)
     val direction = (json \ "direction").asOpt[String].getOrElse("out")
 
     val ids = (json \ "ids").asOpt[List[JsValue]].getOrElse(Nil)
@@ -645,4 +670,30 @@ class RequestParser(config: Config) {
     val vertices = toVertices(labelName, direction, ids)
     (labels, direction, ids, ts, vertices)
   }
+
+  def toFetchAndDeleteParam(json: JsValue) = {
+    val labelName = (json \ "label").as[String]
+    val fromOpt = (json \ "from").asOpt[JsValue]
+    val toOpt = (json \ "to").asOpt[JsValue]
+    val direction = (json \ "direction").asOpt[String].getOrElse("out")
+    val indexOpt = (json \ "index").asOpt[String]
+    val propsOpt = (json \ "props").asOpt[JsObject]
+    (labelName, fromOpt, toOpt, direction, indexOpt, propsOpt)
+  }
+
+  def parseExperiment(jsQuery: JsValue): Seq[ExperimentParam] = jsQuery.as[Seq[JsObject]].map { obj =>
+    def _require(field: String) = throw new RuntimeException(s"${field} not found")
+
+    val accessToken = (obj \ "accessToken").asOpt[String].getOrElse(_require("accessToken"))
+    val experimentName = (obj \ "experiment").asOpt[String].getOrElse(_require("experiment"))
+    val uuid = (obj \ "#uuid").get match {
+      case JsString(s) => s
+      case JsNumber(n) => n.toString
+      case _ => _require("#uuid")
+    }
+    val body = (obj \ "params").asOpt[JsObject].getOrElse(Json.obj())
+    val impKeyOpt = (obj \ Experiment.ImpressionKey).asOpt[String]
+
+    (body, accessToken, experimentName, uuid, impKeyOpt)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
index 55b3e79..4c77ad6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
@@ -21,7 +21,8 @@ package org.apache.s2graph.core.rest
 
 import java.net.URL
 
-import org.apache.s2graph.core.GraphExceptions.BadQueryException
+import org.apache.s2graph.core.GraphExceptions.{BadQueryException, LabelNotExistException}
+import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls.{Bucket, Experiment, Service}
 import org.apache.s2graph.core.utils.logger
@@ -29,8 +30,21 @@ import play.api.libs.json._
 
 import scala.concurrent.{ExecutionContext, Future}
 
-
 object RestHandler {
+  trait CanLookup[A] {
+    def lookup(m: A, key: String): Option[String]
+  }
+
+  object CanLookup {
+   implicit val oneTupleLookup = new CanLookup[(String, String)] {
+      override def lookup(m: (String, String), key: String) =
+        if (m._1 == key) Option(m._2) else None
+    }
+    implicit val hashMapLookup = new CanLookup[Map[String, String]] {
+      override def lookup(m: Map[String, String], key: String): Option[String] = m.get(key)
+    }
+  }
+
   case class HandlerResult(body: Future[JsValue], headers: (String, String)*)
 }
 
@@ -41,25 +55,31 @@ object RestHandler {
 class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
 
   import RestHandler._
-  val requestParser = new RequestParser(graph.config)
+  val requestParser = new RequestParser(graph)
+
 
   /**
     * Public APIS
     */
-  def doPost(uri: String, body: String, impKeyOpt: => Option[String] = None): HandlerResult = {
+  def doPost[A](uri: String, body: String, headers: A)(implicit ev: CanLookup[A]): HandlerResult = {
+    val impKeyOpt = ev.lookup(headers, Experiment.ImpressionKey)
+    val impIdOpt = ev.lookup(headers, Experiment.ImpressionId)
+
     try {
       val jsQuery = Json.parse(body)
 
       uri match {
-        case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.toSimpleVertexArrJson))
-        case "/graphs/getEdges/grouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithListFormatted))
-        case "/graphs/getEdgesExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.toSimpleVertexArrJson))
-        case "/graphs/getEdgesExcluded/grouped" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
+//        case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery, impIdOpt)(PostProcess.toSimpleVertexArrJson))
+        case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery, impIdOpt)(PostProcess.toJson))
+//        case "/graphs/getEdges/grouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithListFormatted))
+//        case "/graphs/getEdgesExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.toSimpleVertexArrJson))
+//        case "/graphs/getEdgesExcluded/grouped" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
         case "/graphs/checkEdges" => checkEdges(jsQuery)
-        case "/graphs/getEdgesGrouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithList))
-        case "/graphs/getEdgesGroupedExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExclude))
-        case "/graphs/getEdgesGroupedExcludedFormatted" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
+//        case "/graphs/getEdgesGrouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithList))
+//        case "/graphs/getEdgesGroupedExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExclude))
+//        case "/graphs/getEdgesGroupedExcludedFormatted" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
         case "/graphs/getVertices" => HandlerResult(getVertices(jsQuery))
+        case "/graphs/experiments" => experiments(jsQuery)
         case uri if uri.startsWith("/graphs/experiment") =>
           val Array(accessToken, experimentName, uuid) = uri.split("/").takeRight(3)
           experiment(jsQuery, accessToken, experimentName, uuid, impKeyOpt)
@@ -75,17 +95,8 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
     try {
       val (quads, isReverted) = requestParser.toCheckEdgeParam(jsValue)
 
-      HandlerResult(graph.checkEdges(quads).map { case queryRequestWithResultLs =>
-        val edgeJsons = for {
-          queryRequestWithResult <- queryRequestWithResultLs
-          (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
-          edgeWithScore <- queryResult.edgeWithScoreLs
-          (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
-          convertedEdge = if (isReverted) edge.duplicateEdge else edge
-          edgeJson = PostProcess.edgeToJson(convertedEdge, score, queryRequest.query, queryRequest.queryParam)
-        } yield Json.toJson(edgeJson)
-
-        Json.toJson(edgeJsons)
+      HandlerResult(graph.checkEdges(quads).map { case stepResult =>
+        PostProcess.toJson(graph, QueryOption(), stepResult)
       })
     } catch {
       case e: Exception => HandlerResult(Future.failed(e))
@@ -93,11 +104,23 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
   }
 
 
-  /**
-    * Private APIS
-    */
-  private def experiment(contentsBody: JsValue, accessToken: String, experimentName: String, uuid: String, impKeyOpt: => Option[String]): HandlerResult = {
+  private def experiments(jsQuery: JsValue): HandlerResult = {
+    val params: Seq[RequestParser.ExperimentParam] = requestParser.parseExperiment(jsQuery)
+
+    val results = params map { case (body, token, experimentName, uuid, impKeyOpt) =>
+      val handlerResult = experiment(body, token, experimentName, uuid, impKeyOpt)
+      val future = handlerResult.body.recover {
+        case e: Exception => PostProcess.emptyResults ++ Json.obj("error" -> Json.obj("reason" -> e.getMessage))
+      }
+
+      future
+    }
+
+    val result = Future.sequence(results).map(JsArray)
+    HandlerResult(body = result)
+  }
 
+  private def experiment(contentsBody: JsValue, accessToken: String, experimentName: String, uuid: String, impKeyOpt: => Option[String] = None): HandlerResult = {
     try {
       val bucketOpt = for {
         service <- Service.findByAccessToken(accessToken)
@@ -108,7 +131,7 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
       val bucket = bucketOpt.getOrElse(throw new RuntimeException("bucket is not found"))
       if (bucket.isGraphQuery) {
         val ret = buildRequestInner(contentsBody, bucket, uuid)
-        HandlerResult(ret.body, Experiment.impressionKey -> bucket.impressionId)
+        HandlerResult(ret.body, Experiment.ImpressionKey -> bucket.impressionId)
       }
       else throw new RuntimeException("not supported yet")
     } catch {
@@ -127,119 +150,54 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
       val experimentLog = s"POST $path took -1 ms 200 -1 $body"
       logger.debug(experimentLog)
 
-      doPost(path, body)
-    }
-  }
-
-  private def eachQuery(post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue)(q: Query): Future[JsValue] = {
-    val filterOutQueryResultsLs = q.filterOutQuery match {
-      case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
-      case None => Future.successful(Seq.empty)
-    }
-
-    for {
-      queryResultsLs <- graph.getEdges(q)
-      filterOutResultsLs <- filterOutQueryResultsLs
-    } yield {
-      val json = post(queryResultsLs, filterOutResultsLs)
-      json
+      doPost(path, body, Experiment.ImpressionId -> bucket.impressionId)
     }
   }
 
-  def getEdgesAsync(jsonQuery: JsValue)
-                   (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
-
-    val fetch = eachQuery(post) _
+  def getEdgesAsync(jsonQuery: JsValue, impIdOpt: Option[String] = None)
+                   (post: (Graph, QueryOption, StepResult) => JsValue): Future[JsValue] = {
     jsonQuery match {
-      case JsArray(arr) => Future.traverse(arr.map(requestParser.toQuery(_)))(fetch).map(JsArray)
       case obj@JsObject(_) =>
         (obj \ "queries").asOpt[JsValue] match {
-          case None => fetch(requestParser.toQuery(obj))
+          case None =>
+            val query = requestParser.toQuery(obj, impIdOpt)
+            graph.getEdges(query).map(post(graph, query.queryOption, _))
           case _ =>
-            val multiQuery = requestParser.toMultiQuery(obj)
-            val filterOutFuture = multiQuery.queryOption.filterOutQuery match {
-              case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
-              case None => Future.successful(Seq.empty)
-            }
-            val futures = multiQuery.queries.zip(multiQuery.weights).map { case (query, weight) =>
-              val filterOutQueryResultsLs = query.queryOption.filterOutQuery match {
-                case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
-                case None => Future.successful(Seq.empty)
-              }
-              for {
-                queryRequestWithResultLs <- graph.getEdges(query)
-                filterOutResultsLs <- filterOutQueryResultsLs
-              } yield {
-                val newQueryRequestWithResult = for {
-                  queryRequestWithResult <- queryRequestWithResultLs
-                  queryResult = queryRequestWithResult.queryResult
-                } yield {
-                  val newEdgesWithScores = for {
-                    edgeWithScore <- queryRequestWithResult.queryResult.edgeWithScoreLs
-                  } yield {
-                    edgeWithScore.copy(score = edgeWithScore.score * weight)
-                  }
-                  queryRequestWithResult.copy(queryResult = queryResult.copy(edgeWithScoreLs = newEdgesWithScores))
-                }
-                logger.debug(s"[Size]: ${newQueryRequestWithResult.map(_.queryResult.edgeWithScoreLs.size).sum}")
-                (newQueryRequestWithResult, filterOutResultsLs)
-              }
-            }
-            for {
-              filterOut <- filterOutFuture
-              resultWithExcludeLs <- Future.sequence(futures)
-            } yield {
-              PostProcess.toSimpleVertexArrJsonMulti(multiQuery.queryOption, resultWithExcludeLs, filterOut)
-              //              val initial = (ListBuffer.empty[QueryRequestWithResult], ListBuffer.empty[QueryRequestWithResult])
-              //              val (results, excludes) = resultWithExcludeLs.foldLeft(initial) { case ((prevResults, prevExcludes), (results, excludes)) =>
-              //                (prevResults ++= results, prevExcludes ++= excludes)
-              //              }
-              //              PostProcess.toSimpleVertexArrJson(multiQuery.queryOption, results, excludes ++ filterOut)
-            }
+            val multiQuery = requestParser.toMultiQuery(obj, impIdOpt)
+            graph.getEdgesMultiQuery(multiQuery).map(post(graph, multiQuery.queryOption, _))
         }
-      case _ => throw BadQueryException("Cannot support")
-    }
-  }
-
-  private def getEdgesExcludedAsync(jsonQuery: JsValue)
-                                   (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
-    val q = requestParser.toQuery(jsonQuery)
-    val filterOutQuery = Query(q.vertices, Vector(q.steps.last))
 
-    val fetchFuture = graph.getEdges(q)
-    val excludeFuture = graph.getEdges(filterOutQuery)
+      case JsArray(arr) =>
+        val queries = arr.map(requestParser.toQuery(_, impIdOpt))
+        val weights = queries.map(_ => 1.0)
+        val multiQuery = MultiQuery(queries, weights, QueryOption(), jsonQuery)
+        graph.getEdgesMultiQuery(multiQuery).map(post(graph, multiQuery.queryOption, _))
 
-    for {
-      queryResultLs <- fetchFuture
-      exclude <- excludeFuture
-    } yield {
-      post(queryResultLs, exclude)
+      case _ => throw BadQueryException("Cannot support")
     }
   }
 
   private def getVertices(jsValue: JsValue) = {
     val jsonQuery = jsValue
-    val ts = System.currentTimeMillis()
-    val props = "{}"
 
     val vertices = jsonQuery.as[List[JsValue]].flatMap { js =>
       val serviceName = (js \ "serviceName").as[String]
       val columnName = (js \ "columnName").as[String]
-      for (id <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])) yield {
-        Management.toVertex(ts, "insert", id.toString, serviceName, columnName, props)
+      for {
+        idJson <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])
+        id <- jsValueToAny(idJson)
+      } yield {
+        Vertex.toVertex(serviceName, columnName, id)
       }
     }
 
     graph.getVertices(vertices) map { vertices => PostProcess.verticesToJson(vertices) }
   }
 
+
   private def buildRequestBody(requestKeyJsonOpt: Option[JsValue], bucket: Bucket, uuid: String): String = {
     var body = bucket.requestBody.replace("#uuid", uuid)
 
-    //    // replace variable
-    //    body = TemplateHelper.replaceVariable(System.currentTimeMillis(), body)
-
-    // replace param
     for {
       requestKeyJson <- requestKeyJsonOpt
       jsObj <- requestKeyJson.asOpt[JsObject]

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index eaa25af..a6e81b4 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -19,12 +19,10 @@
 
 package org.apache.s2graph.core.storage
 
-import java.util.concurrent.{TimeUnit, Executors}
+import java.util.concurrent.{Executors, TimeUnit}
 
 import com.typesafe.config.Config
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.s2graph.core.ExceptionHandler.{KafkaMessage, Key, Val}
 import org.apache.s2graph.core.GraphExceptions.FetchTimeoutException
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
@@ -37,13 +35,15 @@ import org.apache.s2graph.core.utils.{Extensions, logger}
 import scala.annotation.tailrec
 import scala.collection.Seq
 import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{Promise, ExecutionContext, Future}
+import scala.concurrent.{ExecutionContext, Future, Promise}
 import scala.util.{Random, Try}
 
-abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
+abstract class Storage[R](val graph: Graph,
+                          val config: Config)(implicit ec: ExecutionContext) {
   import HBaseType._
 
   /** storage dependent configurations */
+  val DeleteAllFetchCount = config.getInt("delete.all.fetch.count")
   val MaxRetryNum = config.getInt("max.retry.number")
   val MaxBackOff = config.getInt("max.back.off")
   val BackoffTimeout = config.getInt("back.off.timeout")
@@ -57,11 +57,15 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
   /** retry scheduler */
   val scheduledThreadPool = Executors.newSingleThreadScheduledExecutor()
 
+
   /** handle mutate failed */
   val exceptionHandler = new ExceptionHandler(config)
-
   val failTopic = s"mutateFailed_${config.getString("phase")}"
 
+  /** fallback */
+  val fallback = Future.successful(StepResult.Empty)
+  val innerFallback = Future.successful(StepInnerResult.Empty)
+
   /**
    * Compatibility table
    * | label schema version | snapshot edge | index edge | vertex | note |
@@ -229,7 +233,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
    * @return
    */
   def fetches(queryRequestWithScoreLs: Seq[(QueryRequest, Double)],
-              prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[QueryRequestWithResult]]
+              prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepInnerResult]]
 
   /**
    * fetch Vertex for given request from storage.
@@ -324,7 +328,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
 
   def mutateEdges(edges: Seq[Edge], withWait: Boolean): Future[Seq[Boolean]] = {
     val (strongEdges, weakEdges) =
-      edges.partition(e => e.label.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk"))
+      (edges.partition(e => e.label.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk")))
 
     val weakEdgesFutures = weakEdges.groupBy { e => e.label.hbaseZkAddr }.map { case (zkQuorum, edges) =>
       val mutations = edges.flatMap { edge =>
@@ -449,7 +453,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
       future recoverWith {
         case FetchTimeoutException(retryEdge) =>
           logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}")
-          /* fetch failed. re-fetch should be done */
+          /** fetch failed. re-fetch should be done */
           fetchSnapshotEdge(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
             retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
           }
@@ -465,14 +469,14 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
           }
           logger.info(s"[Try: $tryNum], [Status: $status] partial fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}")
 
-          /* retry logic */
+          /** retry logic */
           val promise = Promise[Boolean]
           val backOff = exponentialBackOff(tryNum)
           scheduledThreadPool.schedule(new Runnable {
             override def run(): Unit = {
               val future = if (failedStatusCode == 0) {
                 // acquire Lock failed. other is mutating so this thead need to re-fetch snapshotEdge.
-                /* fetch failed. re-fetch should be done */
+                /** fetch failed. re-fetch should be done */
                 fetchSnapshotEdge(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
                   retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
                 }
@@ -505,7 +509,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
       case 0 =>
         fetchedSnapshotEdgeOpt match {
           case None =>
-          /*
+          /**
            * no one has never mutated this SN.
            * (squashedEdge, edgeMutate) = Edge.buildOperation(None, edges)
            * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = squashedEdge.ts + 1)
@@ -527,7 +531,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
           case Some(snapshotEdge) =>
             snapshotEdge.pendingEdgeOpt match {
               case None =>
-                /*
+                /**
                  * others finished commit on this SN. but there is no contention.
                  * (squashedEdge, edgeMutate) = Edge.buildOperation(snapshotEdgeOpt, edges)
                  * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = snapshotEdge.version + 1) ?
@@ -549,7 +553,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
               case Some(pendingEdge) =>
                 val isLockExpired = pendingEdge.lockTs.get + LockExpireDuration < System.currentTimeMillis()
                 if (isLockExpired) {
-                  /*
+                  /**
                    * if pendingEdge.ts == snapshotEdge.ts =>
                    *    (squashedEdge, edgeMutate) = Edge.buildOperation(None, Seq(pendingEdge))
                    * else =>
@@ -571,7 +575,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
 
                   commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
                 } else {
-                  /*
+                  /**
                    * others finished commit on this SN and there is currently contention.
                    * this can't be proceed so retry from re-fetch.
                    * throw EX
@@ -584,11 +588,11 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
         }
       case _ =>
 
-        /*
+        /**
          * statusCode > 0 which means self locked and there has been partial failure either on mutate, increment, releaseLock
          */
 
-        /*
+        /**
          * this succeed to lock this SN. keep doing on commit process.
          * if SN.isEmpty =>
          * no one never succed to commit on this SN.
@@ -828,90 +832,96 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
 
 
   /** Delete All */
-  protected def deleteAllFetchedEdgesAsyncOld(queryRequest: QueryRequest,
-                                              queryResult: QueryResult,
+  protected def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepInnerResult,
                                               requestTs: Long,
                                               retryNum: Int): Future[Boolean] = {
-    val queryParam = queryRequest.queryParam
-    val zkQuorum = queryParam.label.hbaseZkAddr
-    val futures = for {
-      edgeWithScore <- queryResult.edgeWithScoreLs
-      (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
-    } yield {
-        /* reverted direction */
-        val reversedIndexedEdgesMutations = edge.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
-          indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
-            buildIncrementsAsync(indexEdge, -1L)
-        }
-        val reversedSnapshotEdgeMutations = snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
-        val forwardIndexedEdgeMutations = edge.edgesWithIndex.flatMap { indexEdge =>
-          indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
-            buildIncrementsAsync(indexEdge, -1L)
+    if (stepInnerResult.isEmpty) Future.successful(true)
+    else {
+      val head = stepInnerResult.edgesWithScoreLs.head
+      val zkQuorum = head.edge.label.hbaseZkAddr
+      val futures = for {
+        edgeWithScore <- stepInnerResult.edgesWithScoreLs
+        (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+      } yield {
+          /** reverted direction */
+          val reversedIndexedEdgesMutations = edge.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
+            indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
+              buildIncrementsAsync(indexEdge, -1L)
+          }
+          val reversedSnapshotEdgeMutations = snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
+          val forwardIndexedEdgeMutations = edge.edgesWithIndex.flatMap { indexEdge =>
+            indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
+              buildIncrementsAsync(indexEdge, -1L)
+          }
+          val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
+          writeToStorage(zkQuorum, mutations, withWait = true)
         }
-        val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
-        writeToStorage(zkQuorum, mutations, withWait = true)
-      }
 
-    Future.sequence(futures).map { rets => rets.forall(identity) }
+      Future.sequence(futures).map { rets => rets.forall(identity) }
+    }
   }
 
-  protected def buildEdgesToDelete(queryRequestWithResultLs: QueryRequestWithResult, requestTs: Long): QueryResult = {
-    val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResultLs).get
-    val edgeWithScoreLs = queryResult.edgeWithScoreLs.filter { edgeWithScore =>
+  protected def buildEdgesToDelete(stepInnerResult: StepInnerResult, requestTs: Long): StepInnerResult = {
+    val filtered = stepInnerResult.edgesWithScoreLs.filter { edgeWithScore =>
       (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree
-    }.map { edgeWithScore =>
-      val label = queryRequest.queryParam.label
-      val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match {
-        case "strong" =>
-          val _newPropsWithTs = edgeWithScore.edge.propsWithTs ++
-            Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion))
-          (GraphUtil.operations("delete"), requestTs, _newPropsWithTs)
-        case _ =>
-          val oldEdge = edgeWithScore.edge
-          (oldEdge.op, oldEdge.version, oldEdge.propsWithTs)
-      }
+    }
+    if (filtered.isEmpty) StepInnerResult.Empty
+    else {
+      val head = filtered.head
+      val label = head.edge.label
+      val edgeWithScoreLs = filtered.map { edgeWithScore =>
+        val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match {
+          case "strong" =>
+            val _newPropsWithTs = edgeWithScore.edge.propsWithTs ++
+              Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion))
+            (GraphUtil.operations("delete"), requestTs, _newPropsWithTs)
+          case _ =>
+            val oldEdge = edgeWithScore.edge
+            (oldEdge.op, oldEdge.version, oldEdge.propsWithTs)
+        }
 
-      val copiedEdge =
-        edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs)
+        val copiedEdge =
+          edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs)
 
-      val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
-//      logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}")
-      edgeToDelete
+        val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
+        //      logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}")
+        edgeToDelete
+      }
+      //Degree edge?
+      StepInnerResult(edgeWithScoreLs, Nil, false)
     }
-
-    queryResult.copy(edgeWithScoreLs = edgeWithScoreLs)
   }
 
-  protected def deleteAllFetchedEdgesLs(queryRequestWithResultLs: Seq[QueryRequestWithResult], requestTs: Long): Future[(Boolean, Boolean)] = {
-    val queryResultLs = queryRequestWithResultLs.map(_.queryResult)
-    queryResultLs.foreach { queryResult =>
-      if (queryResult.isFailure) throw new RuntimeException("fetched result is fallback.")
+  protected def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepInnerResult],
+                                        requestTs: Long): Future[(Boolean, Boolean)] = {
+    stepInnerResultLs.foreach { stepInnerResult =>
+      if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.")
     }
     val futures = for {
-      queryRequestWithResult <- queryRequestWithResultLs
-      (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get
-      deleteQueryResult = buildEdgesToDelete(queryRequestWithResult, requestTs)
-      if deleteQueryResult.edgeWithScoreLs.nonEmpty
+      stepInnerResult <- stepInnerResultLs
+      deleteStepInnerResult = buildEdgesToDelete(stepInnerResult, requestTs)
+      if deleteStepInnerResult.edgesWithScoreLs.nonEmpty
     } yield {
-        val label = queryRequest.queryParam.label
+        val head = deleteStepInnerResult.edgesWithScoreLs.head
+        val label = head.edge.label
         label.schemaVersion match {
           case HBaseType.VERSION3 | HBaseType.VERSION4 =>
             if (label.consistencyLevel == "strong") {
-              /*
+              /**
                * read: snapshotEdge on queryResult = O(N)
                * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge))
                */
-              mutateEdges(deleteQueryResult.edgeWithScoreLs.map(_.edge), withWait = true).map(_.forall(identity))
+              mutateEdges(deleteStepInnerResult.edgesWithScoreLs.map(_.edge), withWait = true).map(_.forall(identity))
             } else {
-              deleteAllFetchedEdgesAsyncOld(queryRequest, deleteQueryResult, requestTs, MaxRetryNum)
+              deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum)
             }
           case _ =>
 
-            /*
+            /**
              * read: x
              * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices)
              */
-            deleteAllFetchedEdgesAsyncOld(queryRequest, deleteQueryResult, requestTs, MaxRetryNum)
+            deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum)
         }
       }
 
@@ -923,10 +933,10 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
     }
   }
 
-  protected def fetchAndDeleteAll(query: Query, requestTs: Long): Future[(Boolean, Boolean)] = {
+  protected def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = {
     val future = for {
-      queryRequestWithResultLs <- getEdges(query)
-      (allDeleted, ret) <- deleteAllFetchedEdgesLs(queryRequestWithResultLs, requestTs)
+      stepInnerResultLs <- Future.sequence(queries.map(getEdgesStepInner(_)))
+      (allDeleted, ret) <- deleteAllFetchedEdgesLs(stepInnerResultLs, requestTs)
     } yield {
 //        logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}")
         (allDeleted, ret)
@@ -960,19 +970,19 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
     }
 
     val requestTs = ts
-    val queryParams = for {
+    /** create query per label */
+    val queries = for {
       label <- labels
     } yield {
         val labelWithDir = LabelWithDirection(label.id.get, dir)
-        QueryParam(labelWithDir).limit(0, DeleteAllFetchSize).duplicatePolicy(Option(Query.DuplicatePolicy.Raw))
+        val queryParam = QueryParam(labelWithDir).limit(0, DeleteAllFetchSize).duplicatePolicy(Option(Query.DuplicatePolicy.Raw))
+        val step = Step(List(queryParam))
+        Query(srcVertices, Vector(step))
       }
 
-    val step = Step(queryParams.toList)
-    val q = Query(srcVertices, Vector(step))
-
     //    Extensions.retryOnSuccessWithBackoff(MaxRetryNum, Random.nextInt(MaxBackOff) + 1) {
-    val retryFuture = Extensions.retryOnSuccess(MaxRetryNum) {
-      fetchAndDeleteAll(q, requestTs)
+        val retryFuture = Extensions.retryOnSuccess(DeleteAllFetchCount) {
+      fetchAndDeleteAll(queries, requestTs)
     } { case (allDeleted, deleteSuccess) =>
       allDeleted && deleteSuccess
     }.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess }
@@ -1039,7 +1049,9 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
                                queryParam: QueryParam,
                                prevScore: Double = 1.0,
                                isInnerCall: Boolean,
-                               parentEdges: Seq[EdgeWithScore]): Seq[EdgeWithScore] = {
+                               parentEdges: Seq[EdgeWithScore],
+                               startOffset: Int = 0,
+                               len: Int = Int.MaxValue): Seq[EdgeWithScore] = {
     if (kvs.isEmpty) Seq.empty
     else {
       val first = kvs.head
@@ -1050,7 +1062,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
         else indexEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, None)
 
       for {
-        kv <- kvs
+        (kv, idx) <- kvs.zipWithIndex if idx >= startOffset && idx < startOffset + len
         edge <-
         if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryParam, None, isInnerCall, parentEdges)
         else toEdge(kv, queryParam, cacheElementOpt, parentEdges)
@@ -1071,19 +1083,6 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
 
   /** End Of Parse Logic */
 
-//  /** methods for consistency */
-//  protected def writeAsyncSimple(zkQuorum: String, elementRpcs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] = {
-//    if (elementRpcs.isEmpty) {
-//      Future.successful(true)
-//    } else {
-//      val futures = elementRpcs.map { rpc => writeToStorage(rpc, withWait) }
-//      Future.sequence(futures).map(_.forall(identity))
-//    }
-//  }
-
-
-  //  def futureCache[T] = Cache[Long, (Long, T)]
-
   protected def toRequestEdge(queryRequest: QueryRequest): Edge = {
     val srcVertex = queryRequest.vertex
     //    val tgtVertexOpt = queryRequest.tgtVertexOpt
@@ -1095,7 +1094,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
     val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir)
     val (srcInnerId, tgtInnerId) = tgtVertexIdOpt match {
       case Some(tgtVertexId) => // _to is given.
-        /* we use toSnapshotEdge so dont need to swap src, tgt */
+        /** we use toSnapshotEdge so dont need to swap src, tgt */
         val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion)
         val tgt = InnerVal.convertVersion(tgtVertexId, tgtColumn.columnType, label.schemaVersion)
         (src, tgt)
@@ -1135,27 +1134,26 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
     }
   }
 
-  protected def fetchStep(orgQuery: Query, queryRequestWithResultsLs: Seq[QueryRequestWithResult]): Future[Seq[QueryRequestWithResult]] = {
-    if (queryRequestWithResultsLs.isEmpty) Future.successful(Nil)
+  protected def fetchStep(orgQuery: Query,
+                          stepIdx: Int,
+                          stepInnerResult: StepInnerResult): Future[StepInnerResult] = {
+    if (stepInnerResult.isEmpty) Future.successful(StepInnerResult.Empty)
     else {
-      val queryRequest = queryRequestWithResultsLs.head.queryRequest
-      val q = orgQuery
-      val queryResultsLs = queryRequestWithResultsLs.map(_.queryResult)
+      val edgeWithScoreLs = stepInnerResult.edgesWithScoreLs
 
-      val stepIdx = queryRequest.stepIdx + 1
+      val q = orgQuery
 
       val prevStepOpt = if (stepIdx > 0) Option(q.steps(stepIdx - 1)) else None
       val prevStepThreshold = prevStepOpt.map(_.nextStepScoreThreshold).getOrElse(QueryParam.DefaultThreshold)
       val prevStepLimit = prevStepOpt.map(_.nextStepLimit).getOrElse(-1)
       val step = q.steps(stepIdx)
+
       val alreadyVisited =
         if (stepIdx == 0) Map.empty[(LabelWithDirection, Vertex), Boolean]
-        else Graph.alreadyVisitedVertices(queryResultsLs)
+        else Graph.alreadyVisitedVertices(stepInnerResult.edgesWithScoreLs)
 
-      val groupedBy = queryResultsLs.flatMap { queryResult =>
-        queryResult.edgeWithScoreLs.map { case edgeWithScore =>
-          edgeWithScore.edge.tgtVertex -> edgeWithScore
-        }
+      val groupedBy = edgeWithScoreLs.map { case edgeWithScore =>
+        edgeWithScore.edge.tgtVertex -> edgeWithScore
       }.groupBy { case (vertex, edgeWithScore) => vertex }
 
       val groupedByFiltered = for {
@@ -1178,39 +1176,48 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
         queryParam <- step.queryParams
       } yield (QueryRequest(q, stepIdx, vertex, queryParam), prevStepScore)
 
-      Graph.filterEdges(fetches(queryRequests, prevStepTgtVertexIdEdges), alreadyVisited)(ec)
+      val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges)
+      Graph.filterEdges(orgQuery, stepIdx, queryRequests.map(_._1), fetchedLs, orgQuery.steps(stepIdx).queryParams, alreadyVisited)(ec)
     }
   }
-
-  protected def fetchStepFuture(orgQuery: Query, queryRequestWithResultLsFuture: Future[Seq[QueryRequestWithResult]]): Future[Seq[QueryRequestWithResult]] = {
-    for {
-      queryRequestWithResultLs <- queryRequestWithResultLsFuture
-      ret <- fetchStep(orgQuery, queryRequestWithResultLs)
-    } yield ret
+  private def getEdgesStepInner(q: Query): Future[StepInnerResult] = {
+    Try {
+      if (q.steps.isEmpty) innerFallback
+      else {
+        // current stepIdx = -1
+        val startStepInnerResult = QueryResult.fromVertices(q)
+        q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) =>
+          for {
+            prevStepInnerResult <- prevStepInnerResultFuture
+            currentStepInnerResult <- fetchStep(q, stepIdx, prevStepInnerResult)
+          } yield currentStepInnerResult
+        }
+      }
+    } recover {
+      case e: Exception =>
+        logger.error(s"getEdgesAsync: $e", e)
+        innerFallback
+    } get
   }
-
-  def getEdges(q: Query): Future[Seq[QueryRequestWithResult]] = {
-    val fallback = {
-      val queryRequest = QueryRequest(query = q, stepIdx = 0, q.vertices.head, queryParam = QueryParam.Empty)
-      Future.successful(q.vertices.map(v => QueryRequestWithResult(queryRequest, QueryResult())))
-    }
+  def getEdges(q: Query): Future[StepResult] = {
     Try {
-
       if (q.steps.isEmpty) {
         // TODO: this should be get vertex query.
         fallback
       } else {
-        // current stepIdx = -1
-        val startQueryResultLs = QueryResult.fromVertices(q)
-        q.steps.foldLeft(Future.successful(startQueryResultLs)) { case (acc, step) =>
-            fetchStepFuture(q, acc)
-//          fetchStepFuture(q, acc).map { stepResults =>
-//            step.queryParams.zip(stepResults).foreach { case (qParam, queryRequestWithResult)  =>
-//              val cursor = Base64.getEncoder.encodeToString(queryRequestWithResult.queryResult.tailCursor)
-//              qParam.cursorOpt = Option(cursor)
-//            }
-//            stepResults
-//          }
+        val filterOutFuture = q.queryOption.filterOutQuery match {
+          case None => innerFallback
+          case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery)
+        }
+        for {
+          innerResult <- getEdgesStepInner(q)
+          filterOutInnerResult <- filterOutFuture
+        } yield {
+          val result = StepResult(graph, q.queryOption, innerResult)
+          if (filterOutInnerResult.isEmpty) result
+          else {
+            StepResult.filterOut(graph, q.queryOption, result, filterOutInnerResult)
+          }
         }
       }
     } recover {
@@ -1220,7 +1227,34 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
     } get
   }
 
-  def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[Seq[QueryRequestWithResult]] = {
+  def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = {
+    val fallback = Future.successful(StepResult.Empty)
+
+    Try {
+      if (mq.queries.isEmpty) fallback
+      else {
+        val filterOutFuture = mq.queryOption.filterOutQuery match {
+          case None => innerFallback
+          case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery)
+        }
+
+        val multiQueryFutures = Future.sequence(mq.queries.map { query => getEdges(query) })
+        for {
+          multiQueryResults <- multiQueryFutures
+          filterOutInnerResult <- filterOutFuture
+        } yield {
+          val merged = StepResult.merges(mq.queryOption, multiQueryResults, mq.weights)
+          StepResult.filterOut(graph, mq.queryOption, merged, filterOutInnerResult)
+        }
+      }
+    } recover {
+      case e: Exception =>
+        logger.error(s"getEdgesAsync: $e", e)
+        fallback
+    } get
+  }
+
+  def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[StepResult] = {
     val ts = System.currentTimeMillis()
     val futures = for {
       (srcVertex, tgtVertex, queryParam) <- params
@@ -1228,15 +1262,18 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
       edge = Edge(srcVertex, tgtVertex, queryParam.labelWithDir, propsWithTs = propsWithTs)
     } yield {
         fetchSnapshotEdge(edge).map { case (queryParam, edgeOpt, kvOpt) =>
-          val _queryParam = queryParam.tgtVertexInnerIdOpt(Option(edge.tgtVertex.innerId))
-          val q = Query.toQuery(Seq(edge.srcVertex), _queryParam)
-          val queryRequest = QueryRequest(q, 0, edge.srcVertex, _queryParam)
-          val queryResult = QueryResult(edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0)))
-          QueryRequestWithResult(queryRequest, queryResult)
+          edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0))
         }
       }
 
-    Future.sequence(futures)
+    Future.sequence(futures).map { edgeWithScoreLs =>
+      val s2EdgeWithScoreLs = edgeWithScoreLs.flatMap { ls =>
+        ls.map { edgeWithScore =>
+          S2EdgeWithScore(edgeWithScore.edge, edgeWithScore.score)
+        }
+      }
+      StepResult(results = s2EdgeWithScoreLs, grouped = Nil, degreeEdges = Nil)
+    }
   }
 
 
@@ -1266,6 +1303,13 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
     }
 
   }
+
+  protected def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = {
+    val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score }
+    edgeWithScores.map { edgeWithScore =>
+      edgeWithScore.copy(score = edgeWithScore.score / sum)
+    }
+  }
   /** end of query */
 
   /** Mutation Builder */
@@ -1290,19 +1334,19 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
     (edgeMutate.edgesToDelete.isEmpty, edgeMutate.edgesToInsert.isEmpty) match {
       case (true, true) =>
 
-        /* when there is no need to update. shouldUpdate == false */
+        /** when there is no need to update. shouldUpdate == false */
         List.empty
       case (true, false) =>
 
-        /* no edges to delete but there is new edges to insert so increase degree by 1 */
+        /** no edges to delete but there is new edges to insert so increase degree by 1 */
         edgeMutate.edgesToInsert.flatMap { e => buildIncrementsAsync(e) }
       case (false, true) =>
 
-        /* no edges to insert but there is old edges to delete so decrease degree by 1 */
+        /** no edges to insert but there is old edges to delete so decrease degree by 1 */
         edgeMutate.edgesToDelete.flatMap { e => buildIncrementsAsync(e, -1L) }
       case (false, false) =>
 
-        /* update on existing edges so no change on degree */
+        /** update on existing edges so no change on degree */
         List.empty
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 138216b..b52ba53 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -82,8 +82,9 @@ object AsynchbaseStorage {
 }
 
 
-class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionContext)
-  extends Storage[Deferred[QueryRequestWithResult]](config) {
+class AsynchbaseStorage(override val graph: Graph,
+                        override val config: Config)(implicit ec: ExecutionContext)
+  extends Storage[Deferred[StepInnerResult]](graph, config) {
 
   import Extensions.DeferOps
 
@@ -100,14 +101,16 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
   private val emptyKeyValues = new util.ArrayList[KeyValue]()
   private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client
 
-  import CanDefer._
-
   /** Future Cache to squash request */
-  private val futureCache = new DeferCache[QueryResult, Deferred, Deferred](config, QueryResult(), "FutureCache", useMetric = true)
+  private val futureCache = new DeferCache[StepInnerResult, Deferred, Deferred](config, StepInnerResult.Empty, "FutureCache", useMetric = true)
 
   /** Simple Vertex Cache */
   private val vertexCache = new DeferCache[Seq[SKeyValue], Promise, Future](config, Seq.empty[SKeyValue])
 
+  private val zkQuorum = config.getString("hbase.zookeeper.quorum")
+  private val zkQuorumSlave =
+    if (config.hasPath("hbase.zookeeper.quorum")) Option(config.getString("hbase.zookeeper.quorum"))
+    else None
 
   /**
    * fire rpcs into proper hbase cluster using client and
@@ -241,7 +244,7 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
         if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: $queryParam")
 
         scanner.setMaxVersions(1)
-        scanner.setMaxNumRows(queryParam.limit)
+        scanner.setMaxNumRows(queryParam.offset + queryParam.limit)
         scanner.setMaxTimestamp(maxTs)
         scanner.setMinTimestamp(minTs)
         scanner.setRpcTimeout(queryParam.rpcTimeoutInMillis)
@@ -280,21 +283,38 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
   override def fetch(queryRequest: QueryRequest,
                      prevStepScore: Double,
                      isInnerCall: Boolean,
-                     parentEdges: Seq[EdgeWithScore]): Deferred[QueryRequestWithResult] = {
+                     parentEdges: Seq[EdgeWithScore]): Deferred[StepInnerResult] = {
+
+    def fetchInner(hbaseRpc: AnyRef): Deferred[StepInnerResult] = {
+      val queryParam = queryRequest.queryParam
 
-    def fetchInner(hbaseRpc: AnyRef): Deferred[QueryResult] = {
       fetchKeyValuesInner(hbaseRpc).withCallback { kvs =>
-        val edgeWithScores = toEdges(kvs, queryRequest.queryParam, prevStepScore, isInnerCall, parentEdges)
-        val resultEdgesWithScores = if (queryRequest.queryParam.sample >= 0) {
-          sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
-        } else edgeWithScores
-        QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty[Byte]))
-//        QueryRequestWithResult(queryRequest, QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty)))
+        val (startOffset, length) = queryParam.label.schemaVersion match {
+          case HBaseType.VERSION4 => (queryParam.offset, queryParam.limit)
+          case _ => (0, kvs.length)
+        }
+
+        val edgeWithScores = toEdges(kvs, queryParam, prevStepScore, isInnerCall, parentEdges, startOffset, length)
+        if (edgeWithScores.isEmpty) StepInnerResult.Empty
+        else {
+          val head = edgeWithScores.head
+          val (degreeEdges, indexEdges) =
+            if (head.edge.isDegree) (Seq(head), edgeWithScores.tail)
+            else (Nil, edgeWithScores)
 
+          val normalized =
+            if (queryRequest.queryParam.shouldNormalize) normalize(indexEdges)
+            else indexEdges
+
+          val sampled = if (queryRequest.queryParam.sample >= 0) {
+            sample(queryRequest, normalized, queryRequest.queryParam.sample)
+          } else normalized
+
+          StepInnerResult(edgesWithScoreLs = sampled, degreeEdges)
+        }
       } recoverWith { ex =>
         logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex)
-        QueryResult(isFailure = true)
-//        QueryRequestWithResult(queryRequest, QueryResult(isFailure = true))
+        StepInnerResult.Failure
       }
     }
 
@@ -302,27 +322,25 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
     val cacheTTL = queryParam.cacheTTLInMillis
     val request = buildRequest(queryRequest)
 
-    val defer =
-      if (cacheTTL <= 0) fetchInner(request)
-      else {
-        val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request))
-        val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
-        futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
+    if (cacheTTL <= 0) fetchInner(request)
+    else {
+      val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request))
+      val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
+      futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
     }
-    defer withCallback { queryResult => QueryRequestWithResult(queryRequest, queryResult)}
   }
 
 
   override def fetches(queryRequestWithScoreLs: scala.Seq[(QueryRequest, Double)],
-                       prevStepEdges: Predef.Map[VertexId, scala.Seq[EdgeWithScore]]): Future[scala.Seq[QueryRequestWithResult]] = {
-    val defers: Seq[Deferred[QueryRequestWithResult]] = for {
+                       prevStepEdges: Predef.Map[VertexId, scala.Seq[EdgeWithScore]]): Future[scala.Seq[StepInnerResult]] = {
+    val defers: Seq[Deferred[StepInnerResult]] = for {
       (queryRequest, prevStepScore) <- queryRequestWithScoreLs
       parentEdges <- prevStepEdges.get(queryRequest.vertex.id)
     } yield fetch(queryRequest, prevStepScore, isInnerCall = false, parentEdges)
 
-    val grouped: Deferred[util.ArrayList[QueryRequestWithResult]] = Deferred.group(defers)
+    val grouped: Deferred[util.ArrayList[StepInnerResult]] = Deferred.group(defers)
     grouped withCallback {
-      queryResults: util.ArrayList[QueryRequestWithResult] =>
+      queryResults: util.ArrayList[StepInnerResult] =>
         queryResults.toIndexedSeq
     } toFuture
   }
@@ -371,47 +389,56 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
   }
 
 
-  override def createTable(zkAddr: String,
+  override def createTable(_zkAddr: String,
                            tableName: String,
                            cfs: List[String],
                            regionMultiplier: Int,
                            ttl: Option[Int],
                            compressionAlgorithm: String): Unit = {
-    logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm")
-    val admin = getAdmin(zkAddr)
-    val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier
-    try {
-      if (!admin.tableExists(TableName.valueOf(tableName))) {
-        try {
-          val desc = new HTableDescriptor(TableName.valueOf(tableName))
-          desc.setDurability(Durability.ASYNC_WAL)
-          for (cf <- cfs) {
-            val columnDesc = new HColumnDescriptor(cf)
-              .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
-              .setBloomFilterType(BloomType.ROW)
-              .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
-              .setMaxVersions(1)
-              .setTimeToLive(2147483647)
-              .setMinVersions(0)
-              .setBlocksize(32768)
-              .setBlockCacheEnabled(true)
-            if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
-            desc.addFamily(columnDesc)
-          }
+    /** TODO: Decide if we will allow each app server to connect to multiple hbase cluster */
+    for {
+      zkAddr <- Seq(zkQuorum) ++ zkQuorumSlave.toSeq
+    } {
+      logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm")
+      val admin = getAdmin(zkAddr)
+      val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier
+      try {
+        if (!admin.tableExists(TableName.valueOf(tableName))) {
+          try {
+            val desc = new HTableDescriptor(TableName.valueOf(tableName))
+            desc.setDurability(Durability.ASYNC_WAL)
+            for (cf <- cfs) {
+              val columnDesc = new HColumnDescriptor(cf)
+                .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
+                .setBloomFilterType(BloomType.ROW)
+                .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
+                .setMaxVersions(1)
+                .setTimeToLive(2147483647)
+                .setMinVersions(0)
+                .setBlocksize(32768)
+                .setBlockCacheEnabled(true)
+              if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
+              desc.addFamily(columnDesc)
+            }
 
-          if (regionCount <= 1) admin.createTable(desc)
-          else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount)
-        } catch {
-          case e: Throwable =>
-            logger.error(s"$zkAddr, $tableName failed with $e", e)
-            throw e
+            if (regionCount <= 1) admin.createTable(desc)
+            else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount)
+          } catch {
+            case e: Throwable =>
+              logger.error(s"$zkAddr, $tableName failed with $e", e)
+              throw e
+          }
+        } else {
+          logger.info(s"$zkAddr, $tableName, $cfs already exist.")
         }
-      } else {
-        logger.info(s"$zkAddr, $tableName, $cfs already exist.")
+      } catch {
+        case e: Throwable =>
+          logger.error(s"$zkAddr, $tableName failed with $e", e)
+          throw e
+      } finally {
+        admin.close()
+        admin.getConnection.close()
       }
-    } finally {
-      admin.close()
-      admin.getConnection.close()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
index 83d4338..c700e53 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
@@ -63,5 +63,5 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge
     else if (indexEdge.op == GraphUtil.operations("incrementCount"))
       Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong)
     else propsToKeyValues(indexEdge.metas.toSeq)
-  
+
  }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
index 4149540..b402c0f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
@@ -51,6 +51,8 @@ object logger {
   private val logger = LoggerFactory.getLogger("application")
   private val errorLogger = LoggerFactory.getLogger("error")
   private val metricLogger = LoggerFactory.getLogger("metrics")
+  private val queryLogger = LoggerFactory.getLogger("query")
+  private val malformedLogger = LoggerFactory.getLogger("malformed")
 
   def metric[T: Loggable](msg: => T) = metricLogger.info(implicitly[Loggable[T]].toLogMessage(msg))
 
@@ -61,6 +63,10 @@ object logger {
   def error[T: Loggable](msg: => T, exception: => Throwable) = errorLogger.error(implicitly[Loggable[T]].toLogMessage(msg), exception)
 
   def error[T: Loggable](msg: => T) = errorLogger.error(implicitly[Loggable[T]].toLogMessage(msg))
+
+  def query[T: Loggable](msg: => T) = queryLogger.info(implicitly[Loggable[T]].toLogMessage(msg))
+
+  def malformed[T: Loggable](msg: => T, exception: => Throwable) = malformedLogger.error(implicitly[Loggable[T]].toLogMessage(msg), exception)
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
index a018c01..6933320 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
@@ -19,10 +19,12 @@
 
 package org.apache.s2graph.core
 
+import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core.mysqls.LabelMeta
 import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs, VertexId}
 import org.apache.s2graph.core.utils.logger
 import org.scalatest.FunSuite
+import play.api.libs.json.{JsObject, Json}
 
 class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
   initTests()
@@ -39,7 +41,8 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
     val (srcId, tgtId, labelName) = ("1", "2", testLabelName)
 
     val bulkEdge = (for ((ts, op, props) <- bulkQueries) yield {
-      Management.toEdge(ts.toLong, op, srcId, tgtId, labelName, "out", props).toLogString
+      val properties = fromJsonToProperties(Json.parse(props).as[JsObject])
+      Edge.toEdge(srcId, tgtId, labelName, "out", properties, ts.toLong, op).toLogString
     }).mkString("\n")
 
     val expected = Seq(