You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/11/16 16:33:39 UTC

[6/7] incubator-s2graph git commit: [S2GRAPH-122]: Change data types of Edge/IndexEdge/SnapshotEdge.

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
index 2cbddea..ca32a14 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
@@ -19,25 +19,31 @@
 
 package org.apache.s2graph.core
 
-import java.util
-import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.Executors
 
 import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.hadoop.fs.Path
+import org.apache.s2graph.core.GraphExceptions.{FetchAllStepFailException, FetchTimeoutException, LabelNotExistException}
 import org.apache.s2graph.core.JSONParser._
-import org.apache.s2graph.core.mysqls.{Label, Model}
-import org.apache.s2graph.core.parsers.WhereParser
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta, Model, Service}
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
-import org.apache.s2graph.core.types.{InnerVal, LabelWithDirection}
-import org.apache.s2graph.core.utils.logger
+import org.apache.s2graph.core.storage.{SKeyValue, Storage}
+import org.apache.s2graph.core.types._
+import org.apache.s2graph.core.utils.{DeferCache, Extensions, SafeUpdateCache, logger}
 import play.api.libs.json.{JsObject, Json}
 
+import scala.annotation.tailrec
 import scala.collection.JavaConversions._
-import scala.collection._
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.concurrent._
-import scala.util.Try
+import scala.util.{Random, Try}
 
 object Graph {
+
+  type HashKey = (Int, Int, Int, Int, Boolean)
+  type FilterHashKey = (Int, Int)
+
+
   val DefaultScore = 1.0
 
   private val DefaultConfigs: Map[String, AnyRef] = Map(
@@ -65,26 +71,130 @@ object Graph {
     "future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
     "future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
     "future.cache.metric.interval" -> java.lang.Integer.valueOf(60000),
+    "query.future.cache.max.size" -> java.lang.Integer.valueOf(1000),
+    "query.future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
+    "query.future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
+    "query.future.cache.metric.interval" -> java.lang.Integer.valueOf(60000),
     "s2graph.storage.backend" -> "hbase",
-    "query.hardlimit" -> java.lang.Integer.valueOf(100000)
+    "query.hardlimit" -> java.lang.Integer.valueOf(100000),
+    "hbase.zookeeper.znode.parent" -> "/hbase",
+    "query.log.sample.rate" -> Double.box(0.05)
   )
 
   var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs)
 
-  /** helpers for filterEdges */
-  type HashKey = (Int, Int, Int, Int, Boolean)
-  type FilterHashKey = (Int, Int)
-  type Result = (ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]],
-    ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)],
-    ListBuffer[(HashKey, FilterHashKey, Edge, Double)])
+  def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try {
+    val parts = GraphUtil.split(s)
+    val logType = parts(2)
+    val element = if (logType == "edge" | logType == "e") {
+      /** current only edge is considered to be bulk loaded */
+      labelMapping.get(parts(5)) match {
+        case None =>
+        case Some(toReplace) =>
+          parts(5) = toReplace
+      }
+      toEdge(parts)
+    } else if (logType == "vertex" | logType == "v") {
+      toVertex(parts)
+    } else {
+      throw new GraphExceptions.JsonParseException("log type is not exist in log.")
+    }
 
-  def toHashKey(queryParam: QueryParam, edge: Edge, isDegree: Boolean): (HashKey, FilterHashKey) = {
-    val src = edge.srcVertex.innerId.hashCode()
-    val tgt = edge.tgtVertex.innerId.hashCode()
-    val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree)
-    val filterHashKey = (src, tgt)
+    element
+  } recover {
+    case e: Exception =>
+      logger.error(s"[toElement]: $e", e)
+      None
+  } get
 
-    (hashKey, filterHashKey)
+
+  def toVertex(s: String): Option[Vertex] = {
+    toVertex(GraphUtil.split(s))
+  }
+
+  def toEdge(s: String): Option[Edge] = {
+    toEdge(GraphUtil.split(s))
+  }
+
+  def toEdge(parts: Array[String]): Option[Edge] = Try {
+    val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
+    val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
+    val tempDirection = if (parts.length >= 8) parts(7) else "out"
+    val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection
+    val edge = Edge.toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation)
+    Option(edge)
+  } recover {
+    case e: Exception =>
+      logger.error(s"[toEdge]: $e", e)
+      throw e
+  } get
+
+  def toVertex(parts: Array[String]): Option[Vertex] = Try {
+    val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
+    val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
+    val vertex = Vertex.toVertex(serviceName, colName, srcId, props, ts.toLong, operation)
+    Option(vertex)
+  } recover {
+    case e: Throwable =>
+      logger.error(s"[toVertex]: $e", e)
+      throw e
+  } get
+
+  def initStorage(graph: Graph, config: Config)(ec: ExecutionContext): Storage[_, _] = {
+    val storageBackend = config.getString("s2graph.storage.backend")
+    logger.info(s"[InitStorage]: $storageBackend")
+
+    storageBackend match {
+      case "hbase" => new AsynchbaseStorage(graph, config)(ec)
+      case _ => throw new RuntimeException("not supported storage.")
+    }
+  }
+
+  def parseCacheConfig(config: Config, prefix: String): Config = {
+    import scala.collection.JavaConversions._
+
+    val kvs = new java.util.HashMap[String, AnyRef]()
+    for {
+      entry <- config.entrySet()
+      (k, v) = (entry.getKey, entry.getValue) if k.startsWith(prefix)
+    } yield {
+      val newKey = k.replace(prefix, "")
+      kvs.put(newKey, v.unwrapped())
+    }
+    ConfigFactory.parseMap(kvs)
+  }
+
+  /** Global helper functions */
+  @tailrec
+  final def randomInt(sampleNumber: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = {
+    if (range < sampleNumber || set.size == sampleNumber) set
+    else randomInt(sampleNumber, range, set + Random.nextInt(range))
+  }
+
+  def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = {
+    if (edges.size <= n) {
+      edges
+    } else {
+      val plainEdges = if (queryRequest.queryParam.offset == 0) {
+        edges.tail
+      } else edges
+
+      val randoms = randomInt(n, plainEdges.size)
+      var samples = List.empty[EdgeWithScore]
+      var idx = 0
+      plainEdges.foreach { e =>
+        if (randoms.contains(idx)) samples = e :: samples
+        idx += 1
+      }
+      samples
+    }
+  }
+
+  def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = {
+    val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score }
+    edgeWithScores.map { edgeWithScore =>
+      edgeWithScore.copy(score = edgeWithScore.score / sum)
+    }
   }
 
   def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): Map[(LabelWithDirection, Vertex), Boolean] = {
@@ -100,7 +210,7 @@ object Graph {
   /** common methods for filter out, transform, aggregate queryResult */
   def convertEdges(queryParam: QueryParam, edge: Edge, nextStepOpt: Option[Step]): Seq[Edge] = {
     for {
-      convertedEdge <- queryParam.transformer.transform(edge, nextStepOpt) if !edge.isDegree
+      convertedEdge <- queryParam.edgeTransformer.transform(queryParam, edge, nextStepOpt) if !edge.isDegree
     } yield convertedEdge
   }
 
@@ -110,18 +220,17 @@ object Graph {
       case None => 1.0
       case Some(timeDecay) =>
         val tsVal = try {
-          val labelMeta = edge.label.metaPropsMap(timeDecay.labelMetaSeq)
-          val innerValWithTsOpt = edge.propsWithTs.get(timeDecay.labelMetaSeq)
+          val innerValWithTsOpt = edge.propsWithTs.get(timeDecay.labelMeta)
           innerValWithTsOpt.map { innerValWithTs =>
             val innerVal = innerValWithTs.innerVal
-            labelMeta.dataType match {
+            timeDecay.labelMeta.dataType match {
               case InnerVal.LONG => innerVal.value match {
                 case n: BigDecimal => n.bigDecimal.longValue()
                 case _ => innerVal.toString().toLong
               }
               case _ => innerVal.toString().toLong
             }
-          } getOrElse(edge.ts)
+          } getOrElse (edge.ts)
         } catch {
           case e: Exception =>
             logger.error(s"processTimeDecay error. ${edge.toLogString}", e)
@@ -134,264 +243,891 @@ object Graph {
     tsVal
   }
 
-  def processDuplicates(queryParam: QueryParam,
-                        duplicates: Seq[(FilterHashKey, EdgeWithScore)]): Seq[(FilterHashKey, EdgeWithScore)] = {
+  def processDuplicates[T](queryParam: QueryParam,
+                           duplicates: Seq[(FilterHashKey, T)])(implicit ev: WithScore[T]): Seq[(FilterHashKey, T)] = {
 
     if (queryParam.label.consistencyLevel != "strong") {
       //TODO:
       queryParam.duplicatePolicy match {
-        case Query.DuplicatePolicy.First => Seq(duplicates.head)
-        case Query.DuplicatePolicy.Raw => duplicates
-        case Query.DuplicatePolicy.CountSum =>
+        case DuplicatePolicy.First => Seq(duplicates.head)
+        case DuplicatePolicy.Raw => duplicates
+        case DuplicatePolicy.CountSum =>
           val countSum = duplicates.size
-          Seq(duplicates.head._1 -> duplicates.head._2.copy(score = countSum))
+          val (headFilterHashKey, headEdgeWithScore) = duplicates.head
+          Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, countSum))
         case _ =>
-          val scoreSum = duplicates.foldLeft(0.0) { case (prev, current) => prev + current._2.score }
-          Seq(duplicates.head._1 -> duplicates.head._2.copy(score = scoreSum))
+          val scoreSum = duplicates.foldLeft(0.0) { case (prev, current) => prev + ev.score(current._2) }
+          val (headFilterHashKey, headEdgeWithScore) = duplicates.head
+          Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, scoreSum))
       }
     } else {
       duplicates
     }
   }
+
+  def toHashKey(queryParam: QueryParam, edge: Edge, isDegree: Boolean): (HashKey, FilterHashKey) = {
+    val src = edge.srcVertex.innerId.hashCode()
+    val tgt = edge.tgtVertex.innerId.hashCode()
+    val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree)
+    val filterHashKey = (src, tgt)
+
+    (hashKey, filterHashKey)
+  }
+
   def filterEdges(q: Query,
                   stepIdx: Int,
                   queryRequests: Seq[QueryRequest],
-                  queryResultLsFuture: Future[Seq[StepInnerResult]],
+                  queryResultLsFuture: Future[Seq[StepResult]],
                   queryParams: Seq[QueryParam],
-                  alreadyVisited: Map[(LabelWithDirection, Vertex), Boolean] = Map.empty[(LabelWithDirection, Vertex), Boolean])
-                 (implicit ec: scala.concurrent.ExecutionContext): Future[StepInnerResult] = {
+                  alreadyVisited: Map[(LabelWithDirection, Vertex), Boolean] = Map.empty,
+                  buildLastStepInnerResult: Boolean = true,
+                  parentEdges: Map[VertexId, Seq[EdgeWithScore]])
+                 (implicit ec: scala.concurrent.ExecutionContext): Future[StepResult] = {
 
     queryResultLsFuture.map { queryRequestWithResultLs =>
-      if (queryRequestWithResultLs.isEmpty) StepInnerResult.Empty
+      val (cursors, failCount) = {
+        val _cursors = ArrayBuffer.empty[Array[Byte]]
+        var _failCount = 0
+
+        queryRequestWithResultLs.foreach { stepResult =>
+          _cursors.append(stepResult.cursors: _*)
+          _failCount += stepResult.failCount
+        }
+
+        _cursors -> _failCount
+      }
+
+
+      if (queryRequestWithResultLs.isEmpty) StepResult.Empty.copy(failCount = failCount)
       else {
+        val isLastStep = stepIdx == q.steps.size - 1
+        val queryOption = q.queryOption
         val step = q.steps(stepIdx)
 
-        val nextStepOpt = if (stepIdx < q.steps.size - 1) Option(q.steps(stepIdx + 1)) else None
-
-        val excludeLabelWithDirSet = new util.HashSet[(Int, Int)]
-        val includeLabelWithDirSet = new util.HashSet[(Int, Int)]
-        step.queryParams.filter(_.exclude).foreach(l => excludeLabelWithDirSet.add(l.labelWithDir.labelId -> l.labelWithDir.dir))
-        step.queryParams.filter(_.include).foreach(l => includeLabelWithDirSet.add(l.labelWithDir.labelId -> l.labelWithDir.dir))
-
-        val edgesToExclude = new util.HashSet[FilterHashKey]()
-        val edgesToInclude = new util.HashSet[FilterHashKey]()
-
-        val sequentialLs = new ListBuffer[(HashKey, FilterHashKey, EdgeWithScore)]()
-        val agg = new mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, EdgeWithScore)]]()
-        val params = new mutable.HashMap[HashKey, QueryParam]()
-        var numOfDuplicates = 0
-        var numOfTotal = 0
-        queryRequests.zip(queryRequestWithResultLs).foreach { case (queryRequest, stepInnerResult) =>
-          val queryParam = queryRequest.queryParam
-          val labelWeight = step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0)
-          val includeExcludeKey = queryParam.labelWithDir.labelId -> queryParam.labelWithDir.dir
-          val shouldBeExcluded = excludeLabelWithDirSet.contains(includeExcludeKey)
-          val shouldBeIncluded = includeLabelWithDirSet.contains(includeExcludeKey)
-          val where = queryParam.where.get
-
-          for {
-            edgeWithScore <- stepInnerResult.edgesWithScoreLs
-            if where == WhereParser.success || where.filter(edgeWithScore.edge)
-            (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
-          } {
-            numOfTotal += 1
-            if (queryParam.transformer.isDefault) {
-              val convertedEdge = edge
-
-              val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree = false)
-
-              /** check if this edge should be exlcuded. */
-              if (shouldBeExcluded) {
-                edgesToExclude.add(filterHashKey)
-              } else {
-                if (shouldBeIncluded) {
-                  edgesToInclude.add(filterHashKey)
-                }
-                val tsVal = processTimeDecay(queryParam, convertedEdge)
-                val newScore = labelWeight * score * tsVal
-                val newEdgeWithScore = EdgeWithScore(convertedEdge, newScore)
-                sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore))
-                agg.get(hashKey) match {
-                  case None =>
-                    val newLs = new ListBuffer[(FilterHashKey, EdgeWithScore)]()
-                    newLs += (filterHashKey -> newEdgeWithScore)
-                    agg += (hashKey -> newLs)
-                  case Some(old) =>
-                    numOfDuplicates += 1
-                    old += (filterHashKey -> newEdgeWithScore)
-                }
-                params += (hashKey -> queryParam)
+        val currentStepResults = queryRequests.view.zip(queryRequestWithResultLs)
+        val shouldBuildInnerResults = !isLastStep || buildLastStepInnerResult
+        val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges)
+
+        if (shouldBuildInnerResults) {
+          val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) =>
+            edgeWithScore
+          }
+
+          /** process step group by */
+          val results = StepResult.filterOutStepGroupBy(_results, step.groupBy)
+          StepResult(edgeWithScores = results, grouped = Nil, degreeEdges = degrees, cursors = cursors, failCount = failCount)
+
+        } else {
+          val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) =>
+            val edge = edgeWithScore.edge
+            val score = edgeWithScore.score
+            val label = edgeWithScore.label
+
+            /** Select */
+            val mergedPropsWithTs =
+            if (queryOption.selectColumns.isEmpty) {
+              label.metaPropsDefaultMapInner.map { case (labelMeta, defaultVal) =>
+                labelMeta -> edge.propsWithTs.getOrElse(labelMeta, defaultVal)
               }
             } else {
-              convertEdges(queryParam, edge, nextStepOpt).foreach { convertedEdge =>
-                val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree = false)
-
-                /** check if this edge should be exlcuded. */
-                if (shouldBeExcluded) {
-                  edgesToExclude.add(filterHashKey)
-                } else {
-                  if (shouldBeIncluded) {
-                    edgesToInclude.add(filterHashKey)
-                  }
-                  val tsVal = processTimeDecay(queryParam, convertedEdge)
-                  val newScore = labelWeight * score * tsVal
-                  val newEdgeWithScore = EdgeWithScore(convertedEdge, newScore)
-                  sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore))
-                  agg.get(hashKey) match {
-                    case None =>
-                      val newLs = new ListBuffer[(FilterHashKey, EdgeWithScore)]()
-                      newLs += (filterHashKey -> newEdgeWithScore)
-                      agg += (hashKey -> newLs)
-                    case Some(old) =>
-                      numOfDuplicates += 1
-                      old += (filterHashKey -> newEdgeWithScore)
-                  }
-                  params += (hashKey -> queryParam)
-                }
+              val initial = Map(LabelMeta.timestamp -> edge.propsWithTs(LabelMeta.timestamp))
+              propsSelectColumns.foldLeft(initial) { case (prev, labelMeta) =>
+                prev + (labelMeta -> edge.propsWithTs.getOrElse(labelMeta, label.metaPropsDefaultMapInner(labelMeta)))
               }
             }
-          }
-        }
 
+            val newEdge = edge.copy(propsWithTs = mergedPropsWithTs)
+            val newEdgeWithScore = edgeWithScore.copy(edge = newEdge)
+            /** OrderBy */
+            val orderByValues =
+             if (queryOption.orderByKeys.isEmpty) (score, edge.tsInnerVal, None, None)
+              else StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys))
+
+            /** StepGroupBy */
+            val stepGroupByValues = newEdgeWithScore.toValues(step.groupBy.keys)
+
+            /** GroupBy */
+            val groupByValues = newEdgeWithScore.toValues(queryOption.groupBy.keys)
+
+            /** FilterOut */
+            val filterOutValues = newEdgeWithScore.toValues(queryOption.filterOutFields)
 
-        val edgeWithScoreLs = new ListBuffer[EdgeWithScore]()
-        if (numOfDuplicates == 0) {
-          // no duplicates at all.
-          for {
-            (hashKey, filterHashKey, edgeWithScore) <- sequentialLs
-            if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
-          } {
-            edgeWithScoreLs += edgeWithScore
+            newEdgeWithScore.copy(orderByValues = orderByValues,
+              stepGroupByValues = stepGroupByValues,
+              groupByValues = groupByValues,
+              filterOutValues = filterOutValues)
           }
-        } else {
-          // need to resolve duplicates.
-          val seen = new mutable.HashSet[HashKey]()
-          for {
-            (hashKey, filterHashKey, edgeWithScore) <- sequentialLs
-            if !seen.contains(hashKey)
-            if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
-          } {
-            val queryParam = params(hashKey)
-            val duplicates = processDuplicates(queryParam, agg(hashKey))
-            duplicates.foreach { case (_, duplicate) =>
-              if (duplicate.score >= queryParam.threshold) {
-                seen += hashKey
-                edgeWithScoreLs += duplicate
+
+          /** process step group by */
+          val results = StepResult.filterOutStepGroupBy(_results, step.groupBy)
+
+          /** process ordered list */
+          val ordered = if (queryOption.groupBy.keys.isEmpty) StepResult.orderBy(queryOption, results) else Nil
+
+          /** process grouped list */
+          val grouped =
+          if (queryOption.groupBy.keys.isEmpty) Nil
+          else {
+            val agg = new scala.collection.mutable.HashMap[StepResult.GroupByKey, (Double, StepResult.Values)]()
+            results.groupBy { edgeWithScore =>
+              //                edgeWithScore.groupByValues.map(_.map(_.toString))
+              edgeWithScore.groupByValues
+            }.foreach { case (k, ls) =>
+              val (scoreSum, merged) = StepResult.mergeOrdered(ls, Nil, queryOption)
+
+              val newScoreSum = scoreSum
+
+              /**
+                * watch out here. by calling toString on Any, we lose type information which will be used
+                * later for toJson.
+                */
+              if (merged.nonEmpty) {
+                val newKey = merged.head.groupByValues
+                agg += (newKey -> (newScoreSum, merged))
               }
             }
+            agg.toSeq.sortBy(_._2._1 * -1)
           }
-        }
 
-        val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges)
-        StepInnerResult(edgesWithScoreLs = edgeWithScoreLs, degreeEdges = degrees)
+          StepResult(edgeWithScores = ordered, grouped = grouped, degreeEdges = degrees, cursors = cursors, failCount = failCount)
+        }
       }
     }
   }
 
-  def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try {
-    val parts = GraphUtil.split(s)
-    val logType = parts(2)
-    val element = if (logType == "edge" | logType == "e") {
-      /* current only edge is considered to be bulk loaded */
-      labelMapping.get(parts(5)) match {
-        case None =>
-        case Some(toReplace) =>
-          parts(5) = toReplace
+  private def toEdgeWithScores(queryRequest: QueryRequest,
+                               stepResult: StepResult,
+                               parentEdges: Map[VertexId, Seq[EdgeWithScore]]): Seq[EdgeWithScore] = {
+    val queryOption = queryRequest.query.queryOption
+    val queryParam = queryRequest.queryParam
+    val prevScore = queryRequest.prevStepScore
+    val labelWeight = queryRequest.labelWeight
+    val edgeWithScores = stepResult.edgeWithScores
+
+    val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent
+    val parents = if (shouldBuildParents) {
+      parentEdges.getOrElse(queryRequest.vertex.id, Nil).map { edgeWithScore =>
+        val edge = edgeWithScore.edge
+        val score = edgeWithScore.score
+        val label = edgeWithScore.label
+
+        /** Select */
+        val mergedPropsWithTs =
+          if (queryOption.selectColumns.isEmpty) {
+            label.metaPropsDefaultMapInner.map { case (labelMeta, defaultVal) =>
+              labelMeta -> edge.propsWithTs.getOrElse(labelMeta, defaultVal)
+            }
+          } else {
+            val initial = Map(LabelMeta.timestamp -> edge.propsWithTs(LabelMeta.timestamp))
+            queryOption.selectColumns.foldLeft(initial) { case (acc, labelMetaName) =>
+              label.metaPropsDefaultMapInnerString.get(labelMetaName) match {
+                case None => acc
+                case Some(defaultValue) =>
+                  val labelMeta = label.metaPropsInvMap(labelMetaName)
+                  acc + (labelMeta -> edge.propsWithTs.getOrElse(labelMeta, defaultValue))
+              }
+            }
+          }
+
+        val newEdge = edge.copy(propsWithTs = mergedPropsWithTs)
+        edgeWithScore.copy(edge = newEdge)
       }
-      toEdge(parts)
-    } else if (logType == "vertex" | logType == "v") {
-      toVertex(parts)
-    } else {
-      throw new GraphExceptions.JsonParseException("log type is not exist in log.")
-    }
+    } else Nil
 
-    element
-  } recover {
-    case e: Exception =>
-      logger.error(s"[toElement]: $e", e)
-      None
-  } get
+    // skip
+    if (queryOption.ignorePrevStepCache) stepResult.edgeWithScores
+    else {
+      val degreeScore = 0.0
 
+      val sampled =
+        if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
+        else edgeWithScores
 
-  def toVertex(s: String): Option[Vertex] = {
-    toVertex(GraphUtil.split(s))
-  }
+      val withScores = for {
+        edgeWithScore <- sampled
+      } yield {
+        val edge = edgeWithScore.edge
+        val edgeScore = edgeWithScore.score
+        val score = queryParam.scorePropagateOp match {
+          case "plus" => edgeScore + prevScore
+          case "divide" =>
+            if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0
+            else edgeScore / (prevScore + queryParam.scorePropagateShrinkage)
+          case _ => edgeScore * prevScore
+        }
 
-  def toEdge(s: String): Option[Edge] = {
-    toEdge(GraphUtil.split(s))
+        val tsVal = processTimeDecay(queryParam, edge)
+        val newScore = degreeScore + score
+        //          val newEdge = if (queryOption.returnTree) edge.copy(parentEdges = parents) else edge
+        val newEdge = edge.copy(parentEdges = parents)
+        edgeWithScore.copy(edge = newEdge, score = newScore * labelWeight * tsVal)
+      }
+
+      val normalized =
+        if (queryParam.shouldNormalize) normalize(withScores)
+        else withScores
+
+      normalized
+    }
   }
 
-  def toEdge(parts: Array[String]): Option[Edge] = Try {
-    val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
-    val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
-    val tempDirection = if (parts.length >= 8) parts(7) else "out"
-    val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection
-    val edge = Edge.toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation)
-    Option(edge)
-  } recover {
-    case e: Exception =>
-      logger.error(s"[toEdge]: $e", e)
-      throw e
-  } get
+  private def buildResult[T](query: Query,
+                             stepIdx: Int,
+                             stepResultLs: Seq[(QueryRequest, StepResult)],
+                             parentEdges: Map[VertexId, Seq[EdgeWithScore]])
+                            (createFunc: (EdgeWithScore, Set[LabelMeta]) => T)
+                            (implicit ev: WithScore[T]): ListBuffer[T] = {
+    import scala.collection._
 
-  def toVertex(parts: Array[String]): Option[Vertex] = Try {
-    val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
-    val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
-    val vertex = Vertex.toVertex(serviceName, colName, srcId, props, ts.toLong, operation)
-    Option(vertex)
-  } recover {
-    case e: Throwable =>
-      logger.error(s"[toVertex]: $e", e)
-      throw e
-  } get
+    val results = ListBuffer.empty[T]
+    val sequentialLs: ListBuffer[(HashKey, FilterHashKey, T, QueryParam)] = ListBuffer.empty
+    val duplicates: mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, T)]] = mutable.HashMap.empty
+    val edgesToExclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty
+    val edgesToInclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty
 
-  def initStorage(graph: Graph, config: Config)(ec: ExecutionContext) = {
-    config.getString("s2graph.storage.backend") match {
-      case "hbase" => new AsynchbaseStorage(graph, config)(ec)
-      case _ => throw new RuntimeException("not supported storage.")
+    var numOfDuplicates = 0
+    val queryOption = query.queryOption
+    val step = query.steps(stepIdx)
+    val excludeLabelWithDirSet = step.queryParams.filter(_.exclude).map(l => l.labelWithDir).toSet
+    val includeLabelWithDirSet = step.queryParams.filter(_.include).map(l => l.labelWithDir).toSet
+
+    stepResultLs.foreach { case (queryRequest, stepInnerResult) =>
+      val queryParam = queryRequest.queryParam
+      val label = queryParam.label
+      val shouldBeExcluded = excludeLabelWithDirSet.contains(queryParam.labelWithDir)
+      val shouldBeIncluded = includeLabelWithDirSet.contains(queryParam.labelWithDir)
+
+      val propsSelectColumns = (for {
+        column <- queryOption.propsSelectColumns
+        labelMeta <- label.metaPropsInvMap.get(column)
+      } yield labelMeta).toSet
+
+      for {
+        edgeWithScore <- toEdgeWithScores(queryRequest, stepInnerResult, parentEdges)
+      } {
+        val edge = edgeWithScore.edge
+        val (hashKey, filterHashKey) = toHashKey(queryParam, edge, isDegree = false)
+        //        params += (hashKey -> queryParam) //
+
+        /** check if this edge should be exlcuded. */
+        if (shouldBeExcluded) {
+          edgesToExclude.add(filterHashKey)
+        } else {
+          if (shouldBeIncluded) {
+            edgesToInclude.add(filterHashKey)
+          }
+          val newEdgeWithScore = createFunc(edgeWithScore, propsSelectColumns)
+
+          sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore, queryParam))
+          duplicates.get(hashKey) match {
+            case None =>
+              val newLs = ListBuffer.empty[(FilterHashKey, T)]
+              newLs += (filterHashKey -> newEdgeWithScore)
+              duplicates += (hashKey -> newLs) //
+            case Some(old) =>
+              numOfDuplicates += 1
+              old += (filterHashKey -> newEdgeWithScore) //
+          }
+        }
+      }
     }
+
+
+    if (numOfDuplicates == 0) {
+      // no duplicates at all.
+      for {
+        (hashKey, filterHashKey, edgeWithScore, _) <- sequentialLs
+        if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
+      } {
+        results += edgeWithScore
+      }
+    } else {
+      // need to resolve duplicates.
+      val seen = new mutable.HashSet[HashKey]()
+      for {
+        (hashKey, filterHashKey, edgeWithScore, queryParam) <- sequentialLs
+        if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
+        if !seen.contains(hashKey)
+      } {
+        //        val queryParam = params(hashKey)
+        processDuplicates(queryParam, duplicates(hashKey)).foreach { case (_, duplicate) =>
+          if (ev.score(duplicate) >= queryParam.threshold) {
+            seen += hashKey
+            results += duplicate
+          }
+        }
+      }
+    }
+    results
   }
+
 }
 
 class Graph(_config: Config)(implicit val ec: ExecutionContext) {
+
+  import Graph._
+
   val config = _config.withFallback(Graph.DefaultConfig)
 
   Model.apply(config)
   Model.loadCache()
 
-  // TODO: Make storage client by config param
-  val storage = Graph.initStorage(this, config)(ec)
+  val MaxRetryNum = config.getInt("max.retry.number")
+  val MaxBackOff = config.getInt("max.back.off")
+  val BackoffTimeout = config.getInt("back.off.timeout")
+  val DeleteAllFetchCount = config.getInt("delete.all.fetch.count")
+  val DeleteAllFetchSize = config.getInt("delete.all.fetch.size")
+  val FailProb = config.getDouble("hbase.fail.prob")
+  val LockExpireDuration = config.getInt("lock.expire.time")
+  val MaxSize = config.getInt("future.cache.max.size")
+  val ExpireAfterWrite = config.getInt("future.cache.expire.after.write")
+  val ExpireAfterAccess = config.getInt("future.cache.expire.after.access")
 
+  val scheduledEx = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
+
+  private def confWithFallback(conf: Config): Config = {
+    conf.withFallback(config)
+  }
+
+  /**
+    * TODO: we need to some way to handle malformed configuration for storage.
+    */
+  val storagePool: scala.collection.mutable.Map[String, Storage[_, _]] = {
+    val labels = Label.findAll()
+    val services = Service.findAll()
+
+    val labelConfigs = labels.flatMap(_.toStorageConfig)
+    val serviceConfigs = services.flatMap(_.toStorageConfig)
+
+    val configs = (labelConfigs ++ serviceConfigs).map { conf =>
+      confWithFallback(conf)
+    }.toSet
+
+    val pools = new java.util.HashMap[Config, Storage[_, _]]()
+    configs.foreach { config =>
+      pools.put(config, Graph.initStorage(this, config)(ec))
+    }
+
+    val m = new java.util.concurrent.ConcurrentHashMap[String, Storage[_, _]]()
+
+    labels.foreach { label =>
+      if (label.storageConfigOpt.isDefined) {
+        m += (s"label:${label.label}" -> pools(label.storageConfigOpt.get))
+      }
+    }
+
+    services.foreach { service =>
+      if (service.storageConfigOpt.isDefined) {
+        m += (s"service:${service.serviceName}" -> pools(service.storageConfigOpt.get))
+      }
+    }
+
+    m
+  }
+
+  val defaultStorage: Storage[_, _] = Graph.initStorage(this, config)(ec)
+
+  /** QueryLevel FutureCache */
+  val queryFutureCache = new DeferCache[StepResult, Promise, Future](parseCacheConfig(config, "query."), empty = StepResult.Empty)
 
   for {
     entry <- config.entrySet() if Graph.DefaultConfigs.contains(entry.getKey)
     (k, v) = (entry.getKey, entry.getValue)
   } logger.info(s"[Initialized]: $k, ${this.config.getAnyRef(k)}")
 
-  /** select */
-  def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[StepResult] = storage.checkEdges(params)
+  def getStorage(service: Service): Storage[_, _] = {
+    storagePool.getOrElse(s"service:${service.serviceName}", defaultStorage)
+  }
+
+  def getStorage(label: Label): Storage[_, _] = {
+    storagePool.getOrElse(s"label:${label.label}", defaultStorage)
+  }
+
+  def flushStorage(): Unit = {
+    storagePool.foreach { case (_, storage) =>
+
+      /** flush is blocking */
+      storage.flush()
+    }
+  }
+
+  def fallback = Future.successful(StepResult.Empty)
+
+  def checkEdges(edges: Seq[Edge]): Future[StepResult] = {
+    val futures = for {
+      edge <- edges
+    } yield {
+      fetchSnapshotEdge(edge).map { case (queryParam, edgeOpt, kvOpt) =>
+        edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0, queryParam.label))
+      }
+    }
+
+    Future.sequence(futures).map { edgeWithScoreLs =>
+      val edgeWithScores = edgeWithScoreLs.flatten
+      StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = Nil)
+    }
+  }
+
+  //  def checkEdges(edges: Seq[Edge]): Future[StepResult] = storage.checkEdges(edges)
+
+  def getEdges(q: Query): Future[StepResult] = {
+    Try {
+      if (q.steps.isEmpty) {
+        // TODO: this should be get vertex query.
+        fallback
+      } else {
+        val filterOutFuture = q.queryOption.filterOutQuery match {
+          case None => fallback
+          case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery)
+        }
+        for {
+          stepResult <- getEdgesStepInner(q)
+          filterOutInnerResult <- filterOutFuture
+        } yield {
+          if (filterOutInnerResult.isEmpty) stepResult
+          else StepResult.filterOut(this, q.queryOption, stepResult, filterOutInnerResult)
+        }
+      }
+    } recover {
+      case e: Exception =>
+        logger.error(s"getEdgesAsync: $e", e)
+        fallback
+    } get
+  }
+
+  def getEdgesStepInner(q: Query, buildLastStepInnerResult: Boolean = false): Future[StepResult] = {
+    Try {
+      if (q.steps.isEmpty) fallback
+      else {
+
+        val queryOption = q.queryOption
+        def fetch: Future[StepResult] = {
+          val startStepInnerResult = QueryResult.fromVertices(this, q)
+          q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) =>
+            for {
+              prevStepInnerResult <- prevStepInnerResultFuture
+              currentStepInnerResult <- fetchStep(q, stepIdx, prevStepInnerResult, buildLastStepInnerResult)
+            } yield {
+              currentStepInnerResult.copy(
+                accumulatedCursors = prevStepInnerResult.accumulatedCursors :+ currentStepInnerResult.cursors,
+                failCount = currentStepInnerResult.failCount + prevStepInnerResult.failCount
+              )
+            }
+          }
+        }
+
+        fetch
+      }
+    } recover {
+      case e: Exception =>
+        logger.error(s"getEdgesAsync: $e", e)
+        fallback
+    } get
+  }
+
+  def fetchStep(orgQuery: Query,
+                stepIdx: Int,
+                stepInnerResult: StepResult,
+                buildLastStepInnerResult: Boolean = false): Future[StepResult] = {
+    if (stepInnerResult.isEmpty) Future.successful(StepResult.Empty)
+    else {
+      val edgeWithScoreLs = stepInnerResult.edgeWithScores
+
+      val q = orgQuery
+      val queryOption = orgQuery.queryOption
+      val prevStepOpt = if (stepIdx > 0) Option(q.steps(stepIdx - 1)) else None
+      val prevStepThreshold = prevStepOpt.map(_.nextStepScoreThreshold).getOrElse(QueryParam.DefaultThreshold)
+      val prevStepLimit = prevStepOpt.map(_.nextStepLimit).getOrElse(-1)
+      val step = q.steps(stepIdx)
+
+     val alreadyVisited =
+        if (stepIdx == 0) Map.empty[(LabelWithDirection, Vertex), Boolean]
+        else alreadyVisitedVertices(stepInnerResult.edgeWithScores)
+
+      val initial = (Map.empty[Vertex, Double], Map.empty[Vertex, ArrayBuffer[EdgeWithScore]])
+      val (sums, grouped) = edgeWithScoreLs.foldLeft(initial) { case ((sum, group), edgeWithScore) =>
+        val key = edgeWithScore.edge.tgtVertex
+        val newScore = sum.getOrElse(key, 0.0) + edgeWithScore.score
+        val buffer = group.getOrElse(key, ArrayBuffer.empty[EdgeWithScore])
+        buffer += edgeWithScore
+        (sum + (key -> newScore), group + (key -> buffer))
+      }
+      val groupedByFiltered = sums.filter(t => t._2 >= prevStepThreshold)
+      val prevStepTgtVertexIdEdges = grouped.map(t => t._1.id -> t._2)
+
+      val nextStepSrcVertices = if (prevStepLimit >= 0) {
+        groupedByFiltered.toSeq.sortBy(-1 * _._2).take(prevStepLimit)
+      } else {
+        groupedByFiltered.toSeq
+      }
+
+      val queryRequests = for {
+        (vertex, prevStepScore) <- nextStepSrcVertices
+        queryParam <- step.queryParams
+      } yield {
+        val labelWeight = step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0)
+        val newPrevStepScore = if (queryOption.shouldPropagateScore) prevStepScore else 1.0
+        QueryRequest(q, stepIdx, vertex, queryParam, newPrevStepScore, labelWeight)
+      }
+
+      val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges)
+
+      filterEdges(orgQuery, stepIdx, queryRequests,
+        fetchedLs, orgQuery.steps(stepIdx).queryParams, alreadyVisited, buildLastStepInnerResult, prevStepTgtVertexIdEdges)(ec)
+    }
+  }
+
+
+  /**
+    * responsible to fire parallel fetch call into storage and create future that will return merged result.
+    *
+    * @param queryRequests
+    * @param prevStepEdges
+    * @return
+    */
+  def fetches(queryRequests: Seq[QueryRequest],
+              prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepResult]] = {
+
+    val reqWithIdxs = queryRequests.zipWithIndex
+    val requestsPerLabel = reqWithIdxs.groupBy(t => t._1.queryParam.label)
+    val aggFuture = requestsPerLabel.foldLeft(Future.successful(Map.empty[Int, StepResult])) { case (prevFuture, (label, reqWithIdxs)) =>
+      for {
+        prev <- prevFuture
+        cur <- getStorage(label).fetches(reqWithIdxs.map(_._1), prevStepEdges)
+      } yield {
+        prev ++ reqWithIdxs.map(_._2).zip(cur).toMap
+      }
+    }
+    aggFuture.map { agg => agg.toSeq.sortBy(_._1).map(_._2) }
+  }
+
+
+  def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = {
+    Try {
+      if (mq.queries.isEmpty) fallback
+      else {
+        val filterOutFuture = mq.queryOption.filterOutQuery match {
+          case None => fallback
+          case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery)
+        }
+
+        val multiQueryFutures = Future.sequence(mq.queries.map { query => getEdges(query) })
+        for {
+          multiQueryResults <- multiQueryFutures
+          filterOutInnerResult <- filterOutFuture
+        } yield {
+          StepResult.merges(mq.queryOption, multiQueryResults, mq.weights, filterOutInnerResult)
+        }
+      }
+    } recover {
+      case e: Exception =>
+        logger.error(s"getEdgesAsync: $e", e)
+        fallback
+    } get
+  }
+
+
+  def fetchSnapshotEdge(edge: Edge): Future[(QueryParam, Option[Edge], Option[SKeyValue])] = {
+    /** TODO: Fix this. currently fetchSnapshotEdge should not use future cache
+      * so use empty cacheKey.
+      * */
+    val queryParam = QueryParam(labelName = edge.label.label,
+      direction = GraphUtil.fromDirection(edge.labelWithDir.dir),
+      tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal),
+      cacheTTLInMillis = -1)
+    val q = Query.toQuery(Seq(edge.srcVertex), queryParam)
+    val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam)
+
+    val storage = getStorage(edge.label)
+    storage.fetchSnapshotEdgeKeyValues(queryRequest).map { kvs =>
+      val (edgeOpt, kvOpt) =
+        if (kvs.isEmpty) (None, None)
+        else {
+          val snapshotEdgeOpt = storage.toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil)
+          val _kvOpt = kvs.headOption
+          (snapshotEdgeOpt, _kvOpt)
+        }
+      (queryParam, edgeOpt, kvOpt)
+    } recoverWith { case ex: Throwable =>
+      logger.error(s"fetchQueryParam failed. fallback return.", ex)
+      throw FetchTimeoutException(s"${edge.toLogString}")
+    }
+  }
+
+  def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = {
+    val verticesWithIdx = vertices.zipWithIndex
+    val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
+      getStorage(service).getVertices(vertexGroup.map(_._1)).map(_.zip(vertexGroup.map(_._2)))
+    }
+
+    Future.sequence(futures).map { ls =>
+      ls.flatten.toSeq.sortBy(_._2).map(_._1)
+    }
+  }
+
+  /** mutate */
+  def deleteAllAdjacentEdges(srcVertices: Seq[Vertex],
+                             labels: Seq[Label],
+                             dir: Int,
+                             ts: Long): Future[Boolean] = {
+
+    val requestTs = ts
+    val vertices = srcVertices
+    /** create query per label */
+    val queries = for {
+      label <- labels
+    } yield {
+      val queryParam = QueryParam(labelName = label.label, direction = GraphUtil.fromDirection(dir),
+        offset = 0, limit = DeleteAllFetchSize, duplicatePolicy = DuplicatePolicy.Raw)
+      val step = Step(List(queryParam))
+      Query(vertices, Vector(step))
+    }
+
+    //    Extensions.retryOnSuccessWithBackoff(MaxRetryNum, Random.nextInt(MaxBackOff) + 1) {
+    val retryFuture = Extensions.retryOnSuccess(DeleteAllFetchCount) {
+      fetchAndDeleteAll(queries, requestTs)
+    } { case (allDeleted, deleteSuccess) =>
+      allDeleted && deleteSuccess
+    }.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess }
+
+    retryFuture onFailure {
+      case ex =>
+        logger.error(s"[Error]: deleteAllAdjacentEdges failed.")
+    }
 
-  def getEdges(q: Query): Future[StepResult] = storage.getEdges(q)
+    retryFuture
+  }
 
-  def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = storage.getEdgesMultiQuery(mq)
+  def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = {
+    val future = for {
+      stepInnerResultLs <- Future.sequence(queries.map(getEdgesStepInner(_, true)))
+      (allDeleted, ret) <- deleteAllFetchedEdgesLs(stepInnerResultLs, requestTs)
+    } yield {
+      //        logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}")
+      (allDeleted, ret)
+    }
 
-  def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = storage.getVertices(vertices)
+    Extensions.retryOnFailure(MaxRetryNum) {
+      future
+    } {
+      logger.error(s"fetch and deleteAll failed.")
+      (true, false)
+    }
 
-  /** write */
-  def deleteAllAdjacentEdges(srcVertices: List[Vertex], labels: Seq[Label], dir: Int, ts: Long): Future[Boolean] =
-    storage.deleteAllAdjacentEdges(srcVertices, labels, dir, ts)
+  }
 
-  def mutateElements(elements: Seq[GraphElement], withWait: Boolean = false): Future[Seq[Boolean]] =
-    storage.mutateElements(elements, withWait)
+  def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepResult],
+                              requestTs: Long): Future[(Boolean, Boolean)] = {
+    stepInnerResultLs.foreach { stepInnerResult =>
+      if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.")
+    }
+    val futures = for {
+      stepInnerResult <- stepInnerResultLs
+      deleteStepInnerResult = buildEdgesToDelete(stepInnerResult, requestTs)
+      if deleteStepInnerResult.edgeWithScores.nonEmpty
+    } yield {
+      val head = deleteStepInnerResult.edgeWithScores.head
+      val label = head.edge.label
+      val ret = label.schemaVersion match {
+        case HBaseType.VERSION3 | HBaseType.VERSION4 =>
+          if (label.consistencyLevel == "strong") {
+            /**
+              * read: snapshotEdge on queryResult = O(N)
+              * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge))
+              */
+            mutateEdges(deleteStepInnerResult.edgeWithScores.map(_.edge), withWait = true).map(_.forall(identity))
+          } else {
+            getStorage(label).deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum)
+          }
+        case _ =>
 
-  def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait)
+          /**
+            * read: x
+            * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices)
+            */
+          getStorage(label).deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum)
+      }
+      ret
+    }
 
-  def mutateVertices(vertices: Seq[Vertex], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateVertices(vertices, withWait)
+    if (futures.isEmpty) {
+      // all deleted.
+      Future.successful(true -> true)
+    } else {
+      Future.sequence(futures).map { rets => false -> rets.forall(identity) }
+    }
+  }
 
-  def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long)]] = storage.incrementCounts(edges, withWait)
+  def buildEdgesToDelete(stepInnerResult: StepResult, requestTs: Long): StepResult = {
+    val filtered = stepInnerResult.edgeWithScores.filter { edgeWithScore =>
+      (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree
+    }
+    if (filtered.isEmpty) StepResult.Empty
+    else {
+      val head = filtered.head
+      val label = head.edge.label
+      val edgeWithScoreLs = filtered.map { edgeWithScore =>
+        val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match {
+          case "strong" =>
+            val _newPropsWithTs = edgeWithScore.edge.propsWithTs ++
+                Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion))
+            (GraphUtil.operations("delete"), requestTs, _newPropsWithTs)
+          case _ =>
+            val oldEdge = edgeWithScore.edge
+            (oldEdge.op, oldEdge.version, oldEdge.propsWithTs)
+        }
+
+        val copiedEdge =
+          edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs)
+
+        val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
+        //      logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}")
+        edgeToDelete
+      }
+      //Degree edge?
+      StepResult(edgeWithScores = edgeWithScoreLs, grouped = Nil, degreeEdges = Nil, false)
+    }
+  }
+
+  //  def deleteAllAdjacentEdges(srcVertices: List[Vertex], labels: Seq[Label], dir: Int, ts: Long): Future[Boolean] =
+  //    storage.deleteAllAdjacentEdges(srcVertices, labels, dir, ts)
+
+  def mutateElements(elements: Seq[GraphElement],
+                     withWait: Boolean = false): Future[Seq[Boolean]] = {
+
+    val edgeBuffer = ArrayBuffer[(Edge, Int)]()
+    val vertexBuffer = ArrayBuffer[(Vertex, Int)]()
+
+    elements.zipWithIndex.foreach {
+      case (e: Edge, idx: Int) => edgeBuffer.append((e, idx))
+      case (v: Vertex, idx: Int) => vertexBuffer.append((v, idx))
+      case any@_ => logger.error(s"Unknown type: ${any}")
+    }
+
+    val edgeFutureWithIdx = mutateEdges(edgeBuffer.map(_._1), withWait).map { result =>
+      edgeBuffer.map(_._2).zip(result)
+    }
+
+    val vertexFutureWithIdx = mutateVertices(vertexBuffer.map(_._1), withWait).map { result =>
+      vertexBuffer.map(_._2).zip(result)
+    }
+
+    val graphFuture = for {
+      edgesMutated <- edgeFutureWithIdx
+      verticesMutated <- vertexFutureWithIdx
+    } yield (edgesMutated ++ verticesMutated).sortBy(_._1).map(_._2)
+
+    graphFuture
+
+  }
+
+  //  def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait)
+
+  def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = {
+    val edgeWithIdxs = edges.zipWithIndex
+
+    val (strongEdges, weakEdges) =
+      edgeWithIdxs.partition { case (edge, idx) =>
+        val e = edge
+        e.label.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk")
+      }
+
+    val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.label.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) =>
+      val futures = edgeWithIdxs.groupBy(_._1.label).map { case (label, edgeGroup) =>
+        val storage = getStorage(label)
+        val edges = edgeGroup.map(_._1)
+        val idxs = edgeGroup.map(_._2)
+
+        /** multiple edges with weak consistency level will be processed as batch */
+        val mutations = edges.flatMap { edge =>
+          val (_, edgeUpdate) =
+            if (edge.op == GraphUtil.operations("delete")) Edge.buildDeleteBulk(None, edge)
+            else Edge.buildOperation(None, Seq(edge))
+
+          storage.buildVertexPutsAsync(edge) ++ storage.indexedEdgeMutations(edgeUpdate) ++ storage.snapshotEdgeMutations(edgeUpdate) ++ storage.increments(edgeUpdate)
+        }
+
+        storage.writeToStorage(zkQuorum, mutations, withWait).map { ret =>
+          idxs.map(idx => idx -> ret)
+        }
+      }
+      Future.sequence(futures)
+    }
+    val (strongDeleteAll, strongEdgesAll) = strongEdges.partition { case (edge, idx) => edge.op == GraphUtil.operations("deleteAll") }
+
+    val deleteAllFutures = strongDeleteAll.map { case (edge, idx) =>
+      deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.label), edge.labelWithDir.dir, edge.ts).map(idx -> _)
+    }
+
+    val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => edge.label }.map { case (label, edgeGroup) =>
+      val edges = edgeGroup.map(_._1)
+      val idxs = edgeGroup.map(_._2)
+      val storage = getStorage(label)
+      storage.mutateStrongEdges(edges, withWait = true).map { rets =>
+        idxs.zip(rets)
+      }
+    }
+
+    for {
+      weak <- Future.sequence(weakEdgesFutures)
+      deleteAll <- Future.sequence(deleteAllFutures)
+      strong <- Future.sequence(strongEdgesFutures)
+    } yield {
+      (deleteAll ++ weak.flatten.flatten ++ strong.flatten).sortBy(_._1).map(_._2)
+    }
+  }
+
+  def mutateVertices(vertices: Seq[Vertex], withWait: Boolean = false): Future[Seq[Boolean]] = {
+    val verticesWithIdx = vertices.zipWithIndex
+    val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
+      getStorage(service).mutateVertices(vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2)))
+    }
+    Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) }
+  }
+
+  def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]] = {
+    val edgesWithIdx = edges.zipWithIndex
+    val futures = edgesWithIdx.groupBy { case (e, idx) => e.label }.map { case (label, edgeGroup) =>
+      getStorage(label).incrementCounts(edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2)))
+    }
+    Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) }
+  }
+
+  def updateDegree(edge: Edge, degreeVal: Long = 0): Future[Boolean] = {
+    val label = edge.label
+
+    val storage = getStorage(label)
+    val kvs = storage.buildDegreePuts(edge, degreeVal)
+
+    storage.writeToStorage(edge.label.service.cluster, kvs, withWait = true)
+  }
 
   def shutdown(): Unit = {
-    storage.flush()
+    flushStorage()
     Model.shutdown()
   }
+
+  def addEdge(srcId: Any,
+              tgtId: Any,
+              labelName: String,
+              direction: String = "out",
+              props: Map[String, Any] = Map.empty,
+              ts: Long = System.currentTimeMillis(),
+              operation: String = "insert",
+              withWait: Boolean = true): Future[Boolean] = {
+
+    val innerEdges = Seq(Edge.toEdge(srcId, tgtId, labelName, direction, props.toMap, ts, operation))
+    mutateEdges(innerEdges, withWait).map(_.headOption.getOrElse(false))
+  }
+
+  def addVertex(serviceName: String,
+                columnName: String,
+                id: Any,
+                props: Map[String, Any] = Map.empty,
+                ts: Long = System.currentTimeMillis(),
+                operation: String = "insert",
+                withWait: Boolean = true): Future[Boolean] = {
+    val innerVertices = Seq(Vertex.toVertex(serviceName, columnName, id, props.toMap, ts, operation))
+    mutateVertices(innerVertices, withWait).map(_.headOption.getOrElse(false))
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
index 2f090cf..2cc1063 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
@@ -20,6 +20,24 @@
 package org.apache.s2graph.core
 
 object GraphExceptions {
+  var fillStckTrace = true
+  class BaseException(msg : String) extends Exception(msg){
+    override def fillInStackTrace : Exception = {
+      if(fillStckTrace) super.fillInStackTrace()
+      this
+    }
+  }
+  class NoStackException(msg : String) extends Exception(msg){
+    override def fillInStackTrace : Exception = {
+      this
+    }
+  }
+
+  class NoStackCauseException(msg : String, ex: Throwable ) extends Exception(msg, ex){
+    override def fillInStackTrace : Exception = {
+      this
+    }
+  }
 
   case class JsonParseException(msg: String) extends Exception(msg)
 
@@ -43,7 +61,11 @@ object GraphExceptions {
 
   case class InvalidHTableException(msg: String) extends Exception(msg)
 
-  case class FetchTimeoutException(msg: String) extends Exception(msg)
+  case class FetchTimeoutException(msg: String) extends NoStackException(msg)
 
   case class DropRequestException(msg: String) extends Exception(msg)
+
+  case class FetchAllStepFailException(msg: String) extends Exception(msg)
+
+  case class AccessDeniedException(amsg: String) extends Exception(amsg)
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala
index ebfee7a..939b596 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala
@@ -50,8 +50,8 @@ object GraphUtil {
     val d = direction.trim().toLowerCase match {
       case "directed" | "d" => Some(0)
       case "undirected" | "u" => Some(2)
-      case "out" => Some(0)
-      case "in" => Some(1)
+      case "out" | "o" => Some(0)
+      case "in" | "i" => Some(1)
       case _ => None
     }
     d.map(x => x.toByte)
@@ -61,8 +61,8 @@ object GraphUtil {
     direction.trim().toLowerCase match {
       case "directed" | "d" => 0
       case "undirected" | "u" => 2
-      case "out" => 0
-      case "in" => 1
+      case "out" | "o" => 0
+      case "in" | "i" => 1
       case _ => 2
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index 9ef7c14..89acc63 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -25,6 +25,7 @@ import org.apache.s2graph.core.mysqls._
 import org.apache.s2graph.core.types.HBaseType._
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.JSONParser._
+import org.apache.s2graph.core.utils.logger
 import play.api.libs.json.Reads._
 import play.api.libs.json._
 
@@ -104,8 +105,8 @@ object Management {
     }
   }
 
-  def findLabel(labelName: String): Option[Label] = {
-    Label.findByName(labelName, useCache = false)
+  def findLabel(labelName: String, useCache: Boolean = false): Option[Label] = {
+    Label.findByName(labelName, useCache = useCache)
   }
 
   def deleteLabel(labelName: String) = {
@@ -117,6 +118,16 @@ object Management {
     }
   }
 
+  def markDeletedLabel(labelName: String) = {
+    Model withTx { implicit session =>
+      Label.findByName(labelName, useCache = false).foreach { label =>
+        // rename & delete_at column filled with current time
+        Label.markDeleted(label)
+      }
+      labelName
+    }
+  }
+
   def addIndex(labelStr: String, indices: Seq[Index]): Try[Label] = {
     Model withTx { implicit session =>
       val label = Label.findByName(labelStr).getOrElse(throw LabelNotExistException(s"$labelStr not found"))
@@ -205,12 +216,12 @@ object Management {
 
   }
 
-  def toProps(label: Label, js: Seq[(String, JsValue)]): Seq[(Byte, InnerValLike)] = {
+  def toProps(label: Label, js: Seq[(String, JsValue)]): Seq[(LabelMeta, InnerValLike)] = {
     val props = for {
       (k, v) <- js
       meta <- label.metaPropsInvMap.get(k)
       innerVal <- jsValueToInnerVal(v, meta.dataType, label.schemaVersion)
-    } yield (meta.seq, innerVal)
+    } yield (meta, innerVal)
 
     props
   }
@@ -248,15 +259,18 @@ object Management {
 
 class Management(graph: Graph) {
   import Management._
-  val storage = graph.storage
 
-  def createTable(zkAddr: String,
+  def createStorageTable(zkAddr: String,
                   tableName: String,
                   cfs: List[String],
                   regionMultiplier: Int,
                   ttl: Option[Int],
-                  compressionAlgorithm: String = DefaultCompressionAlgorithm): Unit =
-    storage.createTable(zkAddr, tableName, cfs, regionMultiplier, ttl, compressionAlgorithm)
+                  compressionAlgorithm: String = DefaultCompressionAlgorithm,
+                  replicationScopeOpt: Option[Int] = None,
+                  totalRegionCount: Option[Int] = None): Unit = {
+    graph.defaultStorage.createTable(zkAddr, tableName, cfs, regionMultiplier, ttl, compressionAlgorithm, replicationScopeOpt, totalRegionCount)
+  }
+
 
   /** HBase specific code */
   def createService(serviceName: String,
@@ -265,9 +279,9 @@ class Management(graph: Graph) {
                     compressionAlgorithm: String = DefaultCompressionAlgorithm): Try[Service] = {
 
     Model withTx { implicit session =>
-      val service = Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm)
-      /* create hbase table for service */
-      storage.createTable(cluster, hTableName, List("e", "v"), preSplitSize, hTableTTL, compressionAlgorithm)
+      val service = Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL.orElse(Some(Integer.MAX_VALUE)), compressionAlgorithm)
+      /** create hbase table for service */
+      graph.getStorage(service).createTable(service.cluster, service.hTableName, List("e", "v"), service.preSplitSize, service.hTableTTL, compressionAlgorithm)
       service
     }
   }
@@ -292,35 +306,26 @@ class Management(graph: Graph) {
                   compressionAlgorithm: String = "gz",
                   options: Option[String]): Try[Label] = {
 
-    val labelOpt = Label.findByName(label, useCache = false)
+    if (label.length > LABEL_NAME_MAX_LENGTH ) throw new LabelNameTooLongException(s"Label name ${label} too long.( max length : ${LABEL_NAME_MAX_LENGTH}} )")
+    if (hTableName.isEmpty && hTableTTL.isDefined) throw new RuntimeException("if want to specify ttl, give hbaseTableName also")
 
+    val labelOpt = Label.findByName(label, useCache = false)
     Model withTx { implicit session =>
-      labelOpt match {
-        case Some(l) =>
-          throw new GraphExceptions.LabelAlreadyExistException(s"Label name ${l.label} already exist.")
-        case None =>
-          /** create all models */
-          if (label.length > LABEL_NAME_MAX_LENGTH ) throw new LabelNameTooLongException(s"Label name ${label} too long.( max length : 40 )")
-          val newLabel = Label.insertAll(label,
-            srcServiceName, srcColumnName, srcColumnType,
-            tgtServiceName, tgtColumnName, tgtColumnType,
-            isDirected, serviceName, indices, props, consistencyLevel,
-            hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm, options)
-
-          /* create hbase table */
-          val service = newLabel.service
-          (hTableName, hTableTTL) match {
-            case (None, None) => // do nothing
-            case (None, Some(hbaseTableTTL)) => throw new RuntimeException("if want to specify ttl, give hbaseTableName also")
-            case (Some(hbaseTableName), None) =>
-              // create own hbase table with default ttl on service level.
-              storage.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, service.hTableTTL, compressionAlgorithm)
-            case (Some(hbaseTableName), Some(hbaseTableTTL)) =>
-              // create own hbase table with own ttl.
-              storage.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, hTableTTL, compressionAlgorithm)
-          }
-          newLabel
-      }
+      if (labelOpt.isDefined) throw new LabelAlreadyExistException(s"Label name ${label} already exist.")
+
+      /** create all models */
+      val newLabel = Label.insertAll(label,
+        srcServiceName, srcColumnName, srcColumnType,
+        tgtServiceName, tgtColumnName, tgtColumnType,
+        isDirected, serviceName, indices, props, consistencyLevel,
+        hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm, options)
+
+      /** create hbase table */
+      val storage = graph.getStorage(newLabel)
+      val service = newLabel.service
+      storage.createTable(service.cluster, newLabel.hbaseTableName, List("e", "v"), service.preSplitSize, newLabel.hTableTTL, newLabel.compressionAlgorithm)
+
+      newLabel
     }
   }
 
@@ -331,12 +336,11 @@ class Management(graph: Graph) {
    * copy label when if oldLabel exist and newLabel do not exist.
    * copy label: only used by bulk load job. not sure if we need to parameterize hbase cluster.
    */
-  def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]) = {
+  def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]): Try[Label] = {
     val old = Label.findByName(oldLabelName, useCache = false).getOrElse(throw new LabelNotExistException(s"Old label $oldLabelName not exists."))
-    if (Label.findByName(newLabelName, useCache = false).isDefined) throw new LabelAlreadyExistException(s"New label $newLabelName already exists.")
 
-    val allProps = old.metas.map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) }
-    val allIndices = old.indices.map { index => Index(index.name, index.propNames) }
+    val allProps = old.metas(useCache = false).map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) }
+    val allIndices = old.indices(useCache = false).map { index => Index(index.name, index.propNames) }
 
     createLabel(newLabelName, old.srcService.serviceName, old.srcColumnName, old.srcColumnType,
       old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType,
@@ -344,4 +348,13 @@ class Management(graph: Graph) {
       allIndices, allProps,
       old.consistencyLevel, hTableName, old.hTableTTL, old.schemaVersion, old.isAsync, old.compressionAlgorithm, old.options)
   }
+
+  def getCurrentStorageInfo(labelName: String): Try[Map[String, String]] = for {
+    label <- Try(Label.findByName(labelName, useCache = false).get)
+  } yield {
+    val storage = graph.getStorage(label)
+    storage.info
+  }
+
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
index 7cc2420..6c8563c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
@@ -19,12 +19,18 @@
 
 package org.apache.s2graph.core
 
-import org.apache.s2graph.core.GraphExceptions.BadQueryException
+import java.util.Base64
+
+import com.google.protobuf.ByteString
+import org.apache.s2graph.core.GraphExceptions.{BadQueryException, LabelNotExistException}
 import org.apache.s2graph.core.mysqls._
-import org.apache.s2graph.core.types.{InnerVal, InnerValLike}
+import org.apache.s2graph.core.types.{InnerVal, InnerValLike, InnerValLikeWithTs}
 import org.apache.s2graph.core.JSONParser._
+import org.apache.s2graph.core.rest.RequestParser
+import org.apache.s2graph.core.utils.logger
 import play.api.libs.json.{Json, _}
 
+import scala.collection.JavaConversions._
 import scala.collection.immutable
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 
@@ -40,74 +46,100 @@ object PostProcess {
    * Result Entity score field name
    */
   val emptyDegrees = Seq.empty[JsValue]
-  val timeoutResults = Json.obj("size" -> 0, "degrees" -> Json.arr(), "results" -> Json.arr(), "isTimeout" -> true)
-  val emptyResults = Json.obj("size" -> 0, "degrees" -> Json.arr(), "results" -> Json.arr(), "isEmpty" -> true)
+  val emptyResults = Json.obj("size" -> 0, "degrees" -> Json.arr(), "results" -> Json.arr(), "isEmpty" -> true, "rpcFail" -> 0)
+
   def badRequestResults(ex: => Exception) = ex match {
     case ex: BadQueryException => Json.obj("message" -> ex.msg)
     case _ => Json.obj("message" -> ex.getMessage)
   }
 
-  val SCORE_FIELD_NAME = "scoreSum"
-  val reservedColumns = Set("cacheRemain", "from", "to", "label", "direction", "_timestamp", "timestamp", "score", "props")
-
-
   def s2EdgeParent(graph: Graph,
+                   queryOption: QueryOption,
                    parentEdges: Seq[EdgeWithScore]): JsValue = {
     if (parentEdges.isEmpty) JsNull
     else {
       val ancestors = for {
         current <- parentEdges
-        parents = s2EdgeParent(graph, current.edge.parentEdges) if parents != JsNull
+        parents = s2EdgeParent(graph, queryOption, current.edge.parentEdges) if parents != JsNull
       } yield {
           val s2Edge = current.edge.originalEdgeOpt.getOrElse(current.edge)
-          s2EdgeToJsValue(s2Edge, current.score, false, parents = parents)
+          s2EdgeToJsValue(queryOption, current.copy(edge = s2Edge), false, parents = parents, checkSelectColumns = true)
         }
       Json.toJson(ancestors)
     }
   }
 
-  def s2EdgeToJsValue(s2Edge: Edge,
-                      score: Double,
+  def s2EdgeToJsValue(queryOption: QueryOption,
+                      edgeWithScore: EdgeWithScore,
                       isDegree: Boolean = false,
-                      parents: JsValue = JsNull): JsValue = {
+                      parents: JsValue = JsNull,
+                      checkSelectColumns: Boolean = false): JsValue = {
+    //    val builder = immutable.Map.newBuilder[String, JsValue]
+    val builder = ArrayBuffer.empty[(String, JsValue)]
+    val s2Edge = edgeWithScore.edge
+    val score = edgeWithScore.score
+    val label = edgeWithScore.label
     if (isDegree) {
-      Json.obj(
-        "from" -> anyValToJsValue(s2Edge.srcId),
-        "label" -> s2Edge.labelName,
-        LabelMeta.degree.name -> anyValToJsValue(s2Edge.propsWithTs(LabelMeta.degreeSeq).innerVal.value)
-      )
+      builder += ("from" -> anyValToJsValue(s2Edge.srcId).get)
+      builder += ("label" -> anyValToJsValue(label.label).get)
+      builder += ("direction" -> anyValToJsValue(s2Edge.direction).get)
+      builder += (LabelMeta.degree.name -> anyValToJsValue(s2Edge.propsWithTs(LabelMeta.degree).innerVal.value).get)
+      JsObject(builder)
     } else {
-      Json.obj("from" -> anyValToJsValue(s2Edge.srcId),
-        "to" -> anyValToJsValue(s2Edge.tgtId),
-        "label" -> s2Edge.labelName,
-        "score" -> score,
-        "props" -> JSONParser.propertiesToJson(s2Edge.properties),
-        "direction" -> s2Edge.direction,
-        "timestamp" -> anyValToJsValue(s2Edge.ts),
-        "parents" -> parents
-      )
-    }
-  }
+      if (queryOption.withScore) builder += ("score" -> anyValToJsValue(score).get)
 
-  def withImpressionId(queryOption: QueryOption,
-                        size: Int,
-                        degrees: Seq[JsValue],
-                        results: Seq[JsValue]): JsValue = {
-    queryOption.impIdOpt match {
-      case None => Json.obj(
-        "size" -> size,
-        "degrees" -> degrees,
-        "results" -> results
-      )
-      case Some(impId) =>
-        Json.obj(
-          "size" -> size,
-          "degrees" -> degrees,
-          "results" -> results,
-          Experiment.ImpressionKey -> impId
-        )
+      if (queryOption.selectColumns.isEmpty) {
+        builder += ("from" -> anyValToJsValue(s2Edge.srcId).get)
+        builder += ("to" -> anyValToJsValue(s2Edge.tgtId).get)
+        builder += ("label" -> anyValToJsValue(label.label).get)
+
+        val innerProps = ArrayBuffer.empty[(String, JsValue)]
+        for {
+          (labelMeta, v) <- edgeWithScore.edge.propsWithTs
+          jsValue <- anyValToJsValue(v.innerVal.value)
+        } {
+          innerProps += (labelMeta.name -> jsValue)
+        }
+
+
+        builder += ("props" -> JsObject(innerProps))
+        builder += ("direction" -> anyValToJsValue(s2Edge.direction).get)
+        builder += ("timestamp" -> anyValToJsValue(s2Edge.tsInnerVal).get)
+        builder += ("_timestamp" -> anyValToJsValue(s2Edge.tsInnerVal).get) // backward compatibility
+        if (parents != JsNull) builder += ("parents" -> parents)
+        //          Json.toJson(builder.result())
+        JsObject(builder)
+      } else {
+        queryOption.selectColumnsMap.foreach { case (columnName, _) =>
+          columnName match {
+            case "from" => builder += ("from" -> anyValToJsValue(s2Edge.srcId).get)
+            case "_from" => builder += ("_from" -> anyValToJsValue(s2Edge.srcId).get)
+            case "to" => builder += ("to" -> anyValToJsValue(s2Edge.tgtId).get)
+            case "_to" => builder += ("_to" -> anyValToJsValue(s2Edge.tgtId).get)
+            case "label" => builder += ("label" -> anyValToJsValue(label.label).get)
+            case "direction" => builder += ("direction" -> anyValToJsValue(s2Edge.direction).get)
+            case "timestamp" => builder += ("timestamp" -> anyValToJsValue(s2Edge.tsInnerVal).get)
+            case "_timestamp" => builder += ("_timestamp" -> anyValToJsValue(s2Edge.tsInnerVal).get)
+            case _ => // should not happen
+
+          }
+        }
+        val innerProps = ArrayBuffer.empty[(String, JsValue)]
+        for {
+          (labelMeta, v) <- edgeWithScore.edge.propsWithTs
+          if !checkSelectColumns || queryOption.selectColumnsMap.contains(labelMeta.name)
+          jsValue <- anyValToJsValue(v.innerVal.value)
+        } {
+          innerProps += (labelMeta.name -> jsValue)
+        }
+
+        builder += ("props" -> JsObject(innerProps))
+        if (parents != JsNull) builder += ("parents" -> parents)
+        JsObject(builder)
+      }
     }
   }
+
   def s2VertexToJson(s2Vertex: Vertex): Option[JsValue] = {
     val props = for {
       (k, v) <- s2Vertex.properties
@@ -140,6 +172,7 @@ object PostProcess {
 
     val kvs = new ArrayBuffer[(String, JsValue)]()
 
+
     kvs.append("size" -> JsNumber(size))
     kvs.append("degrees" -> JsArray(degrees))
     kvs.append("results" -> JsArray(results))
@@ -149,28 +182,71 @@ object PostProcess {
     JsObject(kvs)
   }
 
-  def toJson(graph: Graph,
-             queryOption: QueryOption,
-             stepResult: StepResult): JsValue = {
+  def buildJsonWith(js: JsValue)(implicit fn: (String, JsValue) => JsValue): JsValue = js match {
+    case JsObject(obj) => JsObject(obj.map { case (k, v) => k -> buildJsonWith(fn(k, v)) })
+    case JsArray(arr) => JsArray(arr.map(buildJsonWith(_)))
+    case _ => js
+  }
+
+  def toJson(orgQuery: Option[JsValue])(graph: Graph,
+                                        queryOption: QueryOption,
+                                        stepResult: StepResult): JsValue = {
+
+    // [[cursor, cursor], [cursor]]
+    lazy val cursors: Seq[Seq[String]] = stepResult.accumulatedCursors.map { stepCursors =>
+      stepCursors.map { cursor => new String(Base64.getEncoder.encode(cursor)) }
+    }
+
+    lazy val cursorJson: JsValue = Json.toJson(cursors)
+
+    // build nextQuery with (original query + cursors)
+    lazy val nextQuery: Option[JsValue] = {
+      if (cursors.exists { stepCursors => stepCursors.exists(_ != "") }) {
+        val cursorIter = cursors.iterator
 
+        orgQuery.map { query =>
+          buildJsonWith(query) { (key, js) =>
+            if (key == "step") {
+              val currentCursor = cursorIter.next
+              val res = js.as[Seq[JsObject]].toStream.zip(currentCursor).filterNot(_._2 == "").map { case (obj, cursor) =>
+                val label = (obj \ "label").as[String]
+                if (Label.findByName(label).get.schemaVersion == "v4") obj + ("cursor" -> JsString(cursor))
+                else {
+                  val limit = (obj \ "limit").asOpt[Int].getOrElse(RequestParser.defaultLimit)
+                  val offset = (obj \ "offset").asOpt[Int].getOrElse(0)
+                  obj + ("offset" -> JsNumber(offset + limit))
+                }
+              }
 
+              JsArray(res)
+            } else js
+          }
+        }
+      } else Option(JsNull)
+    }
+
+    val limitOpt = queryOption.limitOpt
+    val selectColumns = queryOption.selectColumnsMap
     val degrees =
-      if (queryOption.returnDegree) stepResult.degreeEdges.map(t => s2EdgeToJsValue(t.s2Edge, t.score, true))
+      if (queryOption.returnDegree) stepResult.degreeEdges.map(t => s2EdgeToJsValue(queryOption, t, true, JsNull))
       else emptyDegrees
 
     if (queryOption.groupBy.keys.isEmpty) {
       // no group by specified on query.
+      val results = if (limitOpt.isDefined) stepResult.edgeWithScores.take(limitOpt.get) else stepResult.edgeWithScores
+      val ls = results.map { t =>
+        val parents = if (queryOption.returnTree) s2EdgeParent(graph, queryOption, t.edge.parentEdges) else JsNull
 
-      val ls = stepResult.results.map { t =>
-        val parents = if (queryOption.returnTree) s2EdgeParent(graph, t.parentEdges) else JsNull
-        s2EdgeToJsValue(t.s2Edge, t.score, false, parents)
+        s2EdgeToJsValue(queryOption, t, false, parents)
       }
-      withImpressionId(queryOption, ls.size, degrees, ls)
+
+      withOptionalFields(queryOption, ls.size, degrees, ls, stepResult.failCount, cursorJson, nextQuery)
     } else {
 
+      val grouped = if (limitOpt.isDefined) stepResult.grouped.take(limitOpt.get) else stepResult.grouped
       val results =
         for {
-          (groupByValues, (scoreSum, edges)) <- stepResult.grouped
+          (groupByValues, (scoreSum, edges)) <- grouped
         } yield {
           val groupByKeyValues = queryOption.groupBy.keys.zip(groupByValues).map { case (k, valueOpt) =>
             k -> valueOpt.flatMap(anyValToJsValue).getOrElse(JsNull)
@@ -185,8 +261,8 @@ object PostProcess {
             )
           } else {
             val agg = edges.map { t =>
-              val parents = if (queryOption.returnTree) s2EdgeParent(graph, t.parentEdges) else JsNull
-              s2EdgeToJsValue(t.s2Edge, t.score, false, parents)
+              val parents = if (queryOption.returnTree) s2EdgeParent(graph, queryOption, t.edge.parentEdges) else JsNull
+              s2EdgeToJsValue(queryOption, t, false, parents)
             }
             val aggJson = Json.toJson(agg)
             Json.obj(
@@ -196,7 +272,8 @@ object PostProcess {
             )
           }
         }
-      withImpressionId(queryOption, results.size, degrees, results)
+
+      withOptionalFields(queryOption, results.size, degrees, results, stepResult.failCount, cursorJson, nextQuery)
     }
   }
-}
+}
\ No newline at end of file