You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/12/01 08:30:14 UTC

[04/10] incubator-s2graph git commit: [S2GRAPH-131]: Add actual implementation on interfaces from TinkerPop3 structure package. - Change core.Edge/Vertex/Graph to core.S2Edge/S2Vertex/S2Graph. - Implement base interfaces for tinkerpop3 structure packag

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
deleted file mode 100644
index 38477b4..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
+++ /dev/null
@@ -1,1238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core
-
-import java.util
-import java.util.concurrent.Executors
-
-import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.commons.configuration.Configuration
-import org.apache.s2graph.core.GraphExceptions.{FetchAllStepFailException, FetchTimeoutException, LabelNotExistException}
-import org.apache.s2graph.core.JSONParser._
-import org.apache.s2graph.core.mysqls._
-import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
-import org.apache.s2graph.core.storage.{SKeyValue, Storage}
-import org.apache.s2graph.core.types._
-import org.apache.s2graph.core.utils.{DeferCache, Extensions, SafeUpdateCache, logger}
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer
-import org.apache.tinkerpop.gremlin.structure
-import org.apache.tinkerpop.gremlin.structure.Graph.Variables
-import org.apache.tinkerpop.gremlin.structure.{Graph => TpGraph, Transaction}
-import play.api.libs.json.{JsObject, Json}
-import scala.annotation.tailrec
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, ListBuffer}
-import scala.concurrent._
-import scala.util.{Random, Try}
-
-object Graph {
-
-  type HashKey = (Int, Int, Int, Int, Boolean)
-  type FilterHashKey = (Int, Int)
-
-
-  val DefaultScore = 1.0
-
-  private val DefaultConfigs: Map[String, AnyRef] = Map(
-    "hbase.zookeeper.quorum" -> "localhost",
-    "hbase.table.name" -> "s2graph",
-    "hbase.table.compression.algorithm" -> "gz",
-    "phase" -> "dev",
-    "db.default.driver" ->  "org.h2.Driver",
-    "db.default.url" -> "jdbc:h2:file:./var/metastore;MODE=MYSQL",
-    "db.default.password" -> "graph",
-    "db.default.user" -> "graph",
-    "cache.max.size" -> java.lang.Integer.valueOf(10000),
-    "cache.ttl.seconds" -> java.lang.Integer.valueOf(60),
-    "hbase.client.retries.number" -> java.lang.Integer.valueOf(20),
-    "hbase.rpcs.buffered_flush_interval" -> java.lang.Short.valueOf(100.toShort),
-    "hbase.rpc.timeout" -> java.lang.Integer.valueOf(1000),
-    "max.retry.number" -> java.lang.Integer.valueOf(100),
-    "lock.expire.time" -> java.lang.Integer.valueOf(1000 * 60 * 10),
-    "max.back.off" -> java.lang.Integer.valueOf(100),
-    "back.off.timeout" -> java.lang.Integer.valueOf(1000),
-    "hbase.fail.prob" -> java.lang.Double.valueOf(-0.1),
-    "delete.all.fetch.size" -> java.lang.Integer.valueOf(1000),
-    "delete.all.fetch.count" -> java.lang.Integer.valueOf(200),
-    "future.cache.max.size" -> java.lang.Integer.valueOf(100000),
-    "future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
-    "future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
-    "future.cache.metric.interval" -> java.lang.Integer.valueOf(60000),
-    "query.future.cache.max.size" -> java.lang.Integer.valueOf(1000),
-    "query.future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
-    "query.future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
-    "query.future.cache.metric.interval" -> java.lang.Integer.valueOf(60000),
-    "s2graph.storage.backend" -> "hbase",
-    "query.hardlimit" -> java.lang.Integer.valueOf(100000),
-    "hbase.zookeeper.znode.parent" -> "/hbase",
-    "query.log.sample.rate" -> Double.box(0.05)
-  )
-
-  var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs)
-
-
-
-  def initStorage(graph: Graph, config: Config)(ec: ExecutionContext): Storage[_, _] = {
-    val storageBackend = config.getString("s2graph.storage.backend")
-    logger.info(s"[InitStorage]: $storageBackend")
-
-    storageBackend match {
-      case "hbase" => new AsynchbaseStorage(graph, config)(ec)
-      case _ => throw new RuntimeException("not supported storage.")
-    }
-  }
-
-  def parseCacheConfig(config: Config, prefix: String): Config = {
-    import scala.collection.JavaConversions._
-
-    val kvs = new java.util.HashMap[String, AnyRef]()
-    for {
-      entry <- config.entrySet()
-      (k, v) = (entry.getKey, entry.getValue) if k.startsWith(prefix)
-    } yield {
-      val newKey = k.replace(prefix, "")
-      kvs.put(newKey, v.unwrapped())
-    }
-    ConfigFactory.parseMap(kvs)
-  }
-
-  /** Global helper functions */
-  @tailrec
-  final def randomInt(sampleNumber: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = {
-    if (range < sampleNumber || set.size == sampleNumber) set
-    else randomInt(sampleNumber, range, set + Random.nextInt(range))
-  }
-
-  def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = {
-    if (edges.size <= n) {
-      edges
-    } else {
-      val plainEdges = if (queryRequest.queryParam.offset == 0) {
-        edges.tail
-      } else edges
-
-      val randoms = randomInt(n, plainEdges.size)
-      var samples = List.empty[EdgeWithScore]
-      var idx = 0
-      plainEdges.foreach { e =>
-        if (randoms.contains(idx)) samples = e :: samples
-        idx += 1
-      }
-      samples
-    }
-  }
-
-  def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = {
-    val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score }
-    edgeWithScores.map { edgeWithScore =>
-      edgeWithScore.copy(score = edgeWithScore.score / sum)
-    }
-  }
-
-  def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): Map[(LabelWithDirection, Vertex), Boolean] = {
-    val vertices = for {
-      edgeWithScore <- edgeWithScoreLs
-      edge = edgeWithScore.edge
-      vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex
-    } yield (edge.labelWithDir, vertex) -> true
-
-    vertices.toMap
-  }
-
-  /** common methods for filter out, transform, aggregate queryResult */
-  def convertEdges(queryParam: QueryParam, edge: Edge, nextStepOpt: Option[Step]): Seq[Edge] = {
-    for {
-      convertedEdge <- queryParam.edgeTransformer.transform(queryParam, edge, nextStepOpt) if !edge.isDegree
-    } yield convertedEdge
-  }
-
-  def processTimeDecay(queryParam: QueryParam, edge: Edge) = {
-    /* process time decay */
-    val tsVal = queryParam.timeDecay match {
-      case None => 1.0
-      case Some(timeDecay) =>
-        val tsVal = try {
-          val innerValWithTsOpt = edge.propertyValue(timeDecay.labelMeta.name)
-          innerValWithTsOpt.map { innerValWithTs =>
-            val innerVal = innerValWithTs.innerVal
-            timeDecay.labelMeta.dataType match {
-              case InnerVal.LONG => innerVal.value match {
-                case n: BigDecimal => n.bigDecimal.longValue()
-                case _ => innerVal.toString().toLong
-              }
-              case _ => innerVal.toString().toLong
-            }
-          } getOrElse (edge.ts)
-        } catch {
-          case e: Exception =>
-            logger.error(s"processTimeDecay error. ${edge.toLogString}", e)
-            edge.ts
-        }
-        val timeDiff = queryParam.timestamp - tsVal
-        timeDecay.decay(timeDiff)
-    }
-
-    tsVal
-  }
-
-  def processDuplicates[T](queryParam: QueryParam,
-                           duplicates: Seq[(FilterHashKey, T)])(implicit ev: WithScore[T]): Seq[(FilterHashKey, T)] = {
-
-    if (queryParam.label.consistencyLevel != "strong") {
-      //TODO:
-      queryParam.duplicatePolicy match {
-        case DuplicatePolicy.First => Seq(duplicates.head)
-        case DuplicatePolicy.Raw => duplicates
-        case DuplicatePolicy.CountSum =>
-          val countSum = duplicates.size
-          val (headFilterHashKey, headEdgeWithScore) = duplicates.head
-          Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, countSum))
-        case _ =>
-          val scoreSum = duplicates.foldLeft(0.0) { case (prev, current) => prev + ev.score(current._2) }
-          val (headFilterHashKey, headEdgeWithScore) = duplicates.head
-          Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, scoreSum))
-      }
-    } else {
-      duplicates
-    }
-  }
-
-  def toHashKey(queryParam: QueryParam, edge: Edge, isDegree: Boolean): (HashKey, FilterHashKey) = {
-    val src = edge.srcVertex.innerId.hashCode()
-    val tgt = edge.tgtVertex.innerId.hashCode()
-    val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree)
-    val filterHashKey = (src, tgt)
-
-    (hashKey, filterHashKey)
-  }
-
-  def filterEdges(q: Query,
-                  stepIdx: Int,
-                  queryRequests: Seq[QueryRequest],
-                  queryResultLsFuture: Future[Seq[StepResult]],
-                  queryParams: Seq[QueryParam],
-                  alreadyVisited: Map[(LabelWithDirection, Vertex), Boolean] = Map.empty,
-                  buildLastStepInnerResult: Boolean = true,
-                  parentEdges: Map[VertexId, Seq[EdgeWithScore]])
-                 (implicit ec: scala.concurrent.ExecutionContext): Future[StepResult] = {
-
-    queryResultLsFuture.map { queryRequestWithResultLs =>
-      val (cursors, failCount) = {
-        val _cursors = ArrayBuffer.empty[Array[Byte]]
-        var _failCount = 0
-
-        queryRequestWithResultLs.foreach { stepResult =>
-          _cursors.append(stepResult.cursors: _*)
-          _failCount += stepResult.failCount
-        }
-
-        _cursors -> _failCount
-      }
-
-
-      if (queryRequestWithResultLs.isEmpty) StepResult.Empty.copy(failCount = failCount)
-      else {
-        val isLastStep = stepIdx == q.steps.size - 1
-        val queryOption = q.queryOption
-        val step = q.steps(stepIdx)
-
-        val currentStepResults = queryRequests.view.zip(queryRequestWithResultLs)
-        val shouldBuildInnerResults = !isLastStep || buildLastStepInnerResult
-        val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges)
-
-        if (shouldBuildInnerResults) {
-          val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) =>
-            edgeWithScore
-          }
-
-          /** process step group by */
-          val results = StepResult.filterOutStepGroupBy(_results, step.groupBy)
-          StepResult(edgeWithScores = results, grouped = Nil, degreeEdges = degrees, cursors = cursors, failCount = failCount)
-
-        } else {
-          val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) =>
-            val edge = edgeWithScore.edge
-            val score = edgeWithScore.score
-            val label = edgeWithScore.label
-
-            /** Select */
-            val mergedPropsWithTs = edge.propertyValuesInner(propsSelectColumns)
-
-//            val newEdge = edge.copy(propsWithTs = mergedPropsWithTs)
-            val newEdge = edge.copyEdgeWithState(mergedPropsWithTs)
-
-            val newEdgeWithScore = edgeWithScore.copy(edge = newEdge)
-            /** OrderBy */
-            val orderByValues =
-             if (queryOption.orderByKeys.isEmpty) (score, edge.tsInnerVal, None, None)
-              else StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys))
-
-            /** StepGroupBy */
-            val stepGroupByValues = newEdgeWithScore.toValues(step.groupBy.keys)
-
-            /** GroupBy */
-            val groupByValues = newEdgeWithScore.toValues(queryOption.groupBy.keys)
-
-            /** FilterOut */
-            val filterOutValues = newEdgeWithScore.toValues(queryOption.filterOutFields)
-
-            newEdgeWithScore.copy(orderByValues = orderByValues,
-              stepGroupByValues = stepGroupByValues,
-              groupByValues = groupByValues,
-              filterOutValues = filterOutValues)
-          }
-
-          /** process step group by */
-          val results = StepResult.filterOutStepGroupBy(_results, step.groupBy)
-
-          /** process ordered list */
-          val ordered = if (queryOption.groupBy.keys.isEmpty) StepResult.orderBy(queryOption, results) else Nil
-
-          /** process grouped list */
-          val grouped =
-          if (queryOption.groupBy.keys.isEmpty) Nil
-          else {
-            val agg = new scala.collection.mutable.HashMap[StepResult.GroupByKey, (Double, StepResult.Values)]()
-            results.groupBy { edgeWithScore =>
-              //                edgeWithScore.groupByValues.map(_.map(_.toString))
-              edgeWithScore.groupByValues
-            }.foreach { case (k, ls) =>
-              val (scoreSum, merged) = StepResult.mergeOrdered(ls, Nil, queryOption)
-
-              val newScoreSum = scoreSum
-
-              /**
-                * watch out here. by calling toString on Any, we lose type information which will be used
-                * later for toJson.
-                */
-              if (merged.nonEmpty) {
-                val newKey = merged.head.groupByValues
-                agg += (newKey -> (newScoreSum, merged))
-              }
-            }
-            agg.toSeq.sortBy(_._2._1 * -1)
-          }
-
-          StepResult(edgeWithScores = ordered, grouped = grouped, degreeEdges = degrees, cursors = cursors, failCount = failCount)
-        }
-      }
-    }
-  }
-
-  private def toEdgeWithScores(queryRequest: QueryRequest,
-                               stepResult: StepResult,
-                               parentEdges: Map[VertexId, Seq[EdgeWithScore]]): Seq[EdgeWithScore] = {
-    val queryOption = queryRequest.query.queryOption
-    val queryParam = queryRequest.queryParam
-    val prevScore = queryRequest.prevStepScore
-    val labelWeight = queryRequest.labelWeight
-    val edgeWithScores = stepResult.edgeWithScores
-
-    val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent
-    val parents = if (shouldBuildParents) {
-      parentEdges.getOrElse(queryRequest.vertex.id, Nil).map { edgeWithScore =>
-        val edge = edgeWithScore.edge
-        val score = edgeWithScore.score
-        val label = edgeWithScore.label
-
-        /** Select */
-        val mergedPropsWithTs =
-          if (queryOption.selectColumns.isEmpty) {
-            edge.propertyValuesInner()
-          } else {
-            val initial = Map(LabelMeta.timestamp -> edge.propertyValueInner(LabelMeta.timestamp))
-            edge.propertyValues(queryOption.selectColumns) ++ initial
-          }
-
-        val newEdge = edge.copyEdgeWithState(mergedPropsWithTs)
-        edgeWithScore.copy(edge = newEdge)
-      }
-    } else Nil
-
-    // skip
-    if (queryOption.ignorePrevStepCache) stepResult.edgeWithScores
-    else {
-      val degreeScore = 0.0
-
-      val sampled =
-        if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
-        else edgeWithScores
-
-      val withScores = for {
-        edgeWithScore <- sampled
-      } yield {
-        val edge = edgeWithScore.edge
-        val edgeScore = edgeWithScore.score
-        val score = queryParam.scorePropagateOp match {
-          case "plus" => edgeScore + prevScore
-          case "divide" =>
-            if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0
-            else edgeScore / (prevScore + queryParam.scorePropagateShrinkage)
-          case _ => edgeScore * prevScore
-        }
-
-        val tsVal = processTimeDecay(queryParam, edge)
-        val newScore = degreeScore + score
-        //          val newEdge = if (queryOption.returnTree) edge.copy(parentEdges = parents) else edge
-        val newEdge = edge.copy(parentEdges = parents)
-        edgeWithScore.copy(edge = newEdge, score = newScore * labelWeight * tsVal)
-      }
-
-      val normalized =
-        if (queryParam.shouldNormalize) normalize(withScores)
-        else withScores
-
-      normalized
-    }
-  }
-
-  private def buildResult[T](query: Query,
-                             stepIdx: Int,
-                             stepResultLs: Seq[(QueryRequest, StepResult)],
-                             parentEdges: Map[VertexId, Seq[EdgeWithScore]])
-                            (createFunc: (EdgeWithScore, Seq[LabelMeta]) => T)
-                            (implicit ev: WithScore[T]): ListBuffer[T] = {
-    import scala.collection._
-
-    val results = ListBuffer.empty[T]
-    val sequentialLs: ListBuffer[(HashKey, FilterHashKey, T, QueryParam)] = ListBuffer.empty
-    val duplicates: mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, T)]] = mutable.HashMap.empty
-    val edgesToExclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty
-    val edgesToInclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty
-
-    var numOfDuplicates = 0
-    val queryOption = query.queryOption
-    val step = query.steps(stepIdx)
-    val excludeLabelWithDirSet = step.queryParams.filter(_.exclude).map(l => l.labelWithDir).toSet
-    val includeLabelWithDirSet = step.queryParams.filter(_.include).map(l => l.labelWithDir).toSet
-
-    stepResultLs.foreach { case (queryRequest, stepInnerResult) =>
-      val queryParam = queryRequest.queryParam
-      val label = queryParam.label
-      val shouldBeExcluded = excludeLabelWithDirSet.contains(queryParam.labelWithDir)
-      val shouldBeIncluded = includeLabelWithDirSet.contains(queryParam.labelWithDir)
-
-      val propsSelectColumns = (for {
-        column <- queryOption.propsSelectColumns
-        labelMeta <- label.metaPropsInvMap.get(column)
-      } yield labelMeta)
-
-      for {
-        edgeWithScore <- toEdgeWithScores(queryRequest, stepInnerResult, parentEdges)
-      } {
-        val edge = edgeWithScore.edge
-        val (hashKey, filterHashKey) = toHashKey(queryParam, edge, isDegree = false)
-        //        params += (hashKey -> queryParam) //
-
-        /** check if this edge should be exlcuded. */
-        if (shouldBeExcluded) {
-          edgesToExclude.add(filterHashKey)
-        } else {
-          if (shouldBeIncluded) {
-            edgesToInclude.add(filterHashKey)
-          }
-          val newEdgeWithScore = createFunc(edgeWithScore, propsSelectColumns)
-
-          sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore, queryParam))
-          duplicates.get(hashKey) match {
-            case None =>
-              val newLs = ListBuffer.empty[(FilterHashKey, T)]
-              newLs += (filterHashKey -> newEdgeWithScore)
-              duplicates += (hashKey -> newLs) //
-            case Some(old) =>
-              numOfDuplicates += 1
-              old += (filterHashKey -> newEdgeWithScore) //
-          }
-        }
-      }
-    }
-
-
-    if (numOfDuplicates == 0) {
-      // no duplicates at all.
-      for {
-        (hashKey, filterHashKey, edgeWithScore, _) <- sequentialLs
-        if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
-      } {
-        results += edgeWithScore
-      }
-    } else {
-      // need to resolve duplicates.
-      val seen = new mutable.HashSet[HashKey]()
-      for {
-        (hashKey, filterHashKey, edgeWithScore, queryParam) <- sequentialLs
-        if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
-        if !seen.contains(hashKey)
-      } {
-        //        val queryParam = params(hashKey)
-        processDuplicates(queryParam, duplicates(hashKey)).foreach { case (_, duplicate) =>
-          if (ev.score(duplicate) >= queryParam.threshold) {
-            seen += hashKey
-            results += duplicate
-          }
-        }
-      }
-    }
-    results
-  }
-
-}
-
-class Graph(_config: Config)(implicit val ec: ExecutionContext) extends TpGraph {
-
-  import Graph._
-
-  val config = _config.withFallback(Graph.DefaultConfig)
-
-  Model.apply(config)
-  Model.loadCache()
-
-  val MaxRetryNum = config.getInt("max.retry.number")
-  val MaxBackOff = config.getInt("max.back.off")
-  val BackoffTimeout = config.getInt("back.off.timeout")
-  val DeleteAllFetchCount = config.getInt("delete.all.fetch.count")
-  val DeleteAllFetchSize = config.getInt("delete.all.fetch.size")
-  val FailProb = config.getDouble("hbase.fail.prob")
-  val LockExpireDuration = config.getInt("lock.expire.time")
-  val MaxSize = config.getInt("future.cache.max.size")
-  val ExpireAfterWrite = config.getInt("future.cache.expire.after.write")
-  val ExpireAfterAccess = config.getInt("future.cache.expire.after.access")
-
-  val scheduledEx = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
-
-  private def confWithFallback(conf: Config): Config = {
-    conf.withFallback(config)
-  }
-
-  /**
-    * TODO: we need to some way to handle malformed configuration for storage.
-    */
-  val storagePool: scala.collection.mutable.Map[String, Storage[_, _]] = {
-    val labels = Label.findAll()
-    val services = Service.findAll()
-
-    val labelConfigs = labels.flatMap(_.toStorageConfig)
-    val serviceConfigs = services.flatMap(_.toStorageConfig)
-
-    val configs = (labelConfigs ++ serviceConfigs).map { conf =>
-      confWithFallback(conf)
-    }.toSet
-
-    val pools = new java.util.HashMap[Config, Storage[_, _]]()
-    configs.foreach { config =>
-      pools.put(config, Graph.initStorage(this, config)(ec))
-    }
-
-    val m = new java.util.concurrent.ConcurrentHashMap[String, Storage[_, _]]()
-
-    labels.foreach { label =>
-      if (label.storageConfigOpt.isDefined) {
-        m += (s"label:${label.label}" -> pools(label.storageConfigOpt.get))
-      }
-    }
-
-    services.foreach { service =>
-      if (service.storageConfigOpt.isDefined) {
-        m += (s"service:${service.serviceName}" -> pools(service.storageConfigOpt.get))
-      }
-    }
-
-    m
-  }
-
-  val defaultStorage: Storage[_, _] = Graph.initStorage(this, config)(ec)
-
-  /** QueryLevel FutureCache */
-  val queryFutureCache = new DeferCache[StepResult, Promise, Future](parseCacheConfig(config, "query."), empty = StepResult.Empty)
-
-  for {
-    entry <- config.entrySet() if Graph.DefaultConfigs.contains(entry.getKey)
-    (k, v) = (entry.getKey, entry.getValue)
-  } logger.info(s"[Initialized]: $k, ${this.config.getAnyRef(k)}")
-
-  def getStorage(service: Service): Storage[_, _] = {
-    storagePool.getOrElse(s"service:${service.serviceName}", defaultStorage)
-  }
-
-  def getStorage(label: Label): Storage[_, _] = {
-    storagePool.getOrElse(s"label:${label.label}", defaultStorage)
-  }
-
-  def flushStorage(): Unit = {
-    storagePool.foreach { case (_, storage) =>
-
-      /** flush is blocking */
-      storage.flush()
-    }
-  }
-
-  def fallback = Future.successful(StepResult.Empty)
-
-  def checkEdges(edges: Seq[Edge]): Future[StepResult] = {
-    val futures = for {
-      edge <- edges
-    } yield {
-      fetchSnapshotEdge(edge).map { case (queryParam, edgeOpt, kvOpt) =>
-        edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0, queryParam.label))
-      }
-    }
-
-    Future.sequence(futures).map { edgeWithScoreLs =>
-      val edgeWithScores = edgeWithScoreLs.flatten
-      StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = Nil)
-    }
-  }
-
-  //  def checkEdges(edges: Seq[Edge]): Future[StepResult] = storage.checkEdges(edges)
-
-  def getEdges(q: Query): Future[StepResult] = {
-    Try {
-      if (q.steps.isEmpty) {
-        // TODO: this should be get vertex query.
-        fallback
-      } else {
-        val filterOutFuture = q.queryOption.filterOutQuery match {
-          case None => fallback
-          case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery)
-        }
-        for {
-          stepResult <- getEdgesStepInner(q)
-          filterOutInnerResult <- filterOutFuture
-        } yield {
-          if (filterOutInnerResult.isEmpty) stepResult
-          else StepResult.filterOut(this, q.queryOption, stepResult, filterOutInnerResult)
-        }
-      }
-    } recover {
-      case e: Exception =>
-        logger.error(s"getEdgesAsync: $e", e)
-        fallback
-    } get
-  }
-
-  def getEdgesStepInner(q: Query, buildLastStepInnerResult: Boolean = false): Future[StepResult] = {
-    Try {
-      if (q.steps.isEmpty) fallback
-      else {
-
-        val queryOption = q.queryOption
-        def fetch: Future[StepResult] = {
-          val startStepInnerResult = QueryResult.fromVertices(this, q)
-          q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) =>
-            for {
-              prevStepInnerResult <- prevStepInnerResultFuture
-              currentStepInnerResult <- fetchStep(q, stepIdx, prevStepInnerResult, buildLastStepInnerResult)
-            } yield {
-              currentStepInnerResult.copy(
-                accumulatedCursors = prevStepInnerResult.accumulatedCursors :+ currentStepInnerResult.cursors,
-                failCount = currentStepInnerResult.failCount + prevStepInnerResult.failCount
-              )
-            }
-          }
-        }
-
-        fetch
-      }
-    } recover {
-      case e: Exception =>
-        logger.error(s"getEdgesAsync: $e", e)
-        fallback
-    } get
-  }
-
-  def fetchStep(orgQuery: Query,
-                stepIdx: Int,
-                stepInnerResult: StepResult,
-                buildLastStepInnerResult: Boolean = false): Future[StepResult] = {
-    if (stepInnerResult.isEmpty) Future.successful(StepResult.Empty)
-    else {
-      val edgeWithScoreLs = stepInnerResult.edgeWithScores
-
-      val q = orgQuery
-      val queryOption = orgQuery.queryOption
-      val prevStepOpt = if (stepIdx > 0) Option(q.steps(stepIdx - 1)) else None
-      val prevStepThreshold = prevStepOpt.map(_.nextStepScoreThreshold).getOrElse(QueryParam.DefaultThreshold)
-      val prevStepLimit = prevStepOpt.map(_.nextStepLimit).getOrElse(-1)
-      val step = q.steps(stepIdx)
-
-     val alreadyVisited =
-        if (stepIdx == 0) Map.empty[(LabelWithDirection, Vertex), Boolean]
-        else alreadyVisitedVertices(stepInnerResult.edgeWithScores)
-
-      val initial = (Map.empty[Vertex, Double], Map.empty[Vertex, ArrayBuffer[EdgeWithScore]])
-      val (sums, grouped) = edgeWithScoreLs.foldLeft(initial) { case ((sum, group), edgeWithScore) =>
-        val key = edgeWithScore.edge.tgtVertex
-        val newScore = sum.getOrElse(key, 0.0) + edgeWithScore.score
-        val buffer = group.getOrElse(key, ArrayBuffer.empty[EdgeWithScore])
-        buffer += edgeWithScore
-        (sum + (key -> newScore), group + (key -> buffer))
-      }
-      val groupedByFiltered = sums.filter(t => t._2 >= prevStepThreshold)
-      val prevStepTgtVertexIdEdges = grouped.map(t => t._1.id -> t._2)
-
-      val nextStepSrcVertices = if (prevStepLimit >= 0) {
-        groupedByFiltered.toSeq.sortBy(-1 * _._2).take(prevStepLimit)
-      } else {
-        groupedByFiltered.toSeq
-      }
-
-      val queryRequests = for {
-        (vertex, prevStepScore) <- nextStepSrcVertices
-        queryParam <- step.queryParams
-      } yield {
-        val labelWeight = step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0)
-        val newPrevStepScore = if (queryOption.shouldPropagateScore) prevStepScore else 1.0
-        QueryRequest(q, stepIdx, vertex, queryParam, newPrevStepScore, labelWeight)
-      }
-
-      val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges)
-
-      filterEdges(orgQuery, stepIdx, queryRequests,
-        fetchedLs, orgQuery.steps(stepIdx).queryParams, alreadyVisited, buildLastStepInnerResult, prevStepTgtVertexIdEdges)(ec)
-    }
-  }
-
-
-  /**
-    * responsible to fire parallel fetch call into storage and create future that will return merged result.
-    *
-    * @param queryRequests
-    * @param prevStepEdges
-    * @return
-    */
-  def fetches(queryRequests: Seq[QueryRequest],
-              prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepResult]] = {
-
-    val reqWithIdxs = queryRequests.zipWithIndex
-    val requestsPerLabel = reqWithIdxs.groupBy(t => t._1.queryParam.label)
-    val aggFuture = requestsPerLabel.foldLeft(Future.successful(Map.empty[Int, StepResult])) { case (prevFuture, (label, reqWithIdxs)) =>
-      for {
-        prev <- prevFuture
-        cur <- getStorage(label).fetches(reqWithIdxs.map(_._1), prevStepEdges)
-      } yield {
-        prev ++ reqWithIdxs.map(_._2).zip(cur).toMap
-      }
-    }
-    aggFuture.map { agg => agg.toSeq.sortBy(_._1).map(_._2) }
-  }
-
-
-  def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = {
-    Try {
-      if (mq.queries.isEmpty) fallback
-      else {
-        val filterOutFuture = mq.queryOption.filterOutQuery match {
-          case None => fallback
-          case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery)
-        }
-
-        val multiQueryFutures = Future.sequence(mq.queries.map { query => getEdges(query) })
-        for {
-          multiQueryResults <- multiQueryFutures
-          filterOutInnerResult <- filterOutFuture
-        } yield {
-          StepResult.merges(mq.queryOption, multiQueryResults, mq.weights, filterOutInnerResult)
-        }
-      }
-    } recover {
-      case e: Exception =>
-        logger.error(s"getEdgesAsync: $e", e)
-        fallback
-    } get
-  }
-
-
-  def fetchSnapshotEdge(edge: Edge): Future[(QueryParam, Option[Edge], Option[SKeyValue])] = {
-    /** TODO: Fix this. currently fetchSnapshotEdge should not use future cache
-      * so use empty cacheKey.
-      * */
-    val queryParam = QueryParam(labelName = edge.innerLabel.label,
-      direction = GraphUtil.fromDirection(edge.labelWithDir.dir),
-      tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal),
-      cacheTTLInMillis = -1)
-    val q = Query.toQuery(Seq(edge.srcVertex), queryParam)
-    val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam)
-
-    val storage = getStorage(edge.innerLabel)
-    storage.fetchSnapshotEdgeKeyValues(queryRequest).map { kvs =>
-      val (edgeOpt, kvOpt) =
-        if (kvs.isEmpty) (None, None)
-        else {
-          val snapshotEdgeOpt = storage.toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil)
-          val _kvOpt = kvs.headOption
-          (snapshotEdgeOpt, _kvOpt)
-        }
-      (queryParam, edgeOpt, kvOpt)
-    } recoverWith { case ex: Throwable =>
-      logger.error(s"fetchQueryParam failed. fallback return.", ex)
-      throw FetchTimeoutException(s"${edge.toLogString}")
-    }
-  }
-
-  def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = {
-    val verticesWithIdx = vertices.zipWithIndex
-    val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
-      getStorage(service).getVertices(vertexGroup.map(_._1)).map(_.zip(vertexGroup.map(_._2)))
-    }
-
-    Future.sequence(futures).map { ls =>
-      ls.flatten.toSeq.sortBy(_._2).map(_._1)
-    }
-  }
-
-  /** mutate */
-  def deleteAllAdjacentEdges(srcVertices: Seq[Vertex],
-                             labels: Seq[Label],
-                             dir: Int,
-                             ts: Long): Future[Boolean] = {
-
-    val requestTs = ts
-    val vertices = srcVertices
-    /** create query per label */
-    val queries = for {
-      label <- labels
-    } yield {
-      val queryParam = QueryParam(labelName = label.label, direction = GraphUtil.fromDirection(dir),
-        offset = 0, limit = DeleteAllFetchSize, duplicatePolicy = DuplicatePolicy.Raw)
-      val step = Step(List(queryParam))
-      Query(vertices, Vector(step))
-    }
-
-    //    Extensions.retryOnSuccessWithBackoff(MaxRetryNum, Random.nextInt(MaxBackOff) + 1) {
-    val retryFuture = Extensions.retryOnSuccess(DeleteAllFetchCount) {
-      fetchAndDeleteAll(queries, requestTs)
-    } { case (allDeleted, deleteSuccess) =>
-      allDeleted && deleteSuccess
-    }.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess }
-
-    retryFuture onFailure {
-      case ex =>
-        logger.error(s"[Error]: deleteAllAdjacentEdges failed.")
-    }
-
-    retryFuture
-  }
-
-  def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = {
-    val future = for {
-      stepInnerResultLs <- Future.sequence(queries.map(getEdgesStepInner(_, true)))
-      (allDeleted, ret) <- deleteAllFetchedEdgesLs(stepInnerResultLs, requestTs)
-    } yield {
-      //        logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}")
-      (allDeleted, ret)
-    }
-
-    Extensions.retryOnFailure(MaxRetryNum) {
-      future
-    } {
-      logger.error(s"fetch and deleteAll failed.")
-      (true, false)
-    }
-
-  }
-
-  def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepResult],
-                              requestTs: Long): Future[(Boolean, Boolean)] = {
-    stepInnerResultLs.foreach { stepInnerResult =>
-      if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.")
-    }
-    val futures = for {
-      stepInnerResult <- stepInnerResultLs
-      deleteStepInnerResult = buildEdgesToDelete(stepInnerResult, requestTs)
-      if deleteStepInnerResult.edgeWithScores.nonEmpty
-    } yield {
-      val head = deleteStepInnerResult.edgeWithScores.head
-      val label = head.edge.innerLabel
-      val ret = label.schemaVersion match {
-        case HBaseType.VERSION3 | HBaseType.VERSION4 =>
-          if (label.consistencyLevel == "strong") {
-            /**
-              * read: snapshotEdge on queryResult = O(N)
-              * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge))
-              */
-            mutateEdges(deleteStepInnerResult.edgeWithScores.map(_.edge), withWait = true).map(_.forall(identity))
-          } else {
-            getStorage(label).deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum)
-          }
-        case _ =>
-
-          /**
-            * read: x
-            * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices)
-            */
-          getStorage(label).deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum)
-      }
-      ret
-    }
-
-    if (futures.isEmpty) {
-      // all deleted.
-      Future.successful(true -> true)
-    } else {
-      Future.sequence(futures).map { rets => false -> rets.forall(identity) }
-    }
-  }
-
-  def buildEdgesToDelete(stepInnerResult: StepResult, requestTs: Long): StepResult = {
-    val filtered = stepInnerResult.edgeWithScores.filter { edgeWithScore =>
-      (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree
-    }
-    if (filtered.isEmpty) StepResult.Empty
-    else {
-      val head = filtered.head
-      val label = head.edge.innerLabel
-      val edgeWithScoreLs = filtered.map { edgeWithScore =>
-          val edge = edgeWithScore.edge
-          val copiedEdge = label.consistencyLevel match {
-            case "strong" =>
-              edge.copyEdge(op = GraphUtil.operations("delete"),
-                version = requestTs, propsWithTs = Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs)
-            case _ =>
-              edge.copyEdge(propsWithTs = Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs)
-          }
-//        val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match {
-//          case "strong" =>
-//            val edge = edgeWithScore.edge
-//            edge.property(LabelMeta.timestamp.name, requestTs)
-//            val _newPropsWithTs = edge.updatePropsWithTs()
-//
-//            (GraphUtil.operations("delete"), requestTs, _newPropsWithTs)
-//          case _ =>
-//            val oldEdge = edgeWithScore.edge
-//            (oldEdge.op, oldEdge.version, oldEdge.updatePropsWithTs())
-//        }
-//
-//        val copiedEdge =
-//          edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs)
-
-        val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
-        //      logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}")
-        edgeToDelete
-      }
-      //Degree edge?
-      StepResult(edgeWithScores = edgeWithScoreLs, grouped = Nil, degreeEdges = Nil, false)
-    }
-  }
-
-  //  def deleteAllAdjacentEdges(srcVertices: List[Vertex], labels: Seq[Label], dir: Int, ts: Long): Future[Boolean] =
-  //    storage.deleteAllAdjacentEdges(srcVertices, labels, dir, ts)
-
-  def mutateElements(elements: Seq[GraphElement],
-                     withWait: Boolean = false): Future[Seq[Boolean]] = {
-
-    val edgeBuffer = ArrayBuffer[(Edge, Int)]()
-    val vertexBuffer = ArrayBuffer[(Vertex, Int)]()
-
-    elements.zipWithIndex.foreach {
-      case (e: Edge, idx: Int) => edgeBuffer.append((e, idx))
-      case (v: Vertex, idx: Int) => vertexBuffer.append((v, idx))
-      case any@_ => logger.error(s"Unknown type: ${any}")
-    }
-
-    val edgeFutureWithIdx = mutateEdges(edgeBuffer.map(_._1), withWait).map { result =>
-      edgeBuffer.map(_._2).zip(result)
-    }
-
-    val vertexFutureWithIdx = mutateVertices(vertexBuffer.map(_._1), withWait).map { result =>
-      vertexBuffer.map(_._2).zip(result)
-    }
-
-    val graphFuture = for {
-      edgesMutated <- edgeFutureWithIdx
-      verticesMutated <- vertexFutureWithIdx
-    } yield (edgesMutated ++ verticesMutated).sortBy(_._1).map(_._2)
-
-    graphFuture
-
-  }
-
-  //  def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait)
-
-  def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = {
-    val edgeWithIdxs = edges.zipWithIndex
-
-    val (strongEdges, weakEdges) =
-      edgeWithIdxs.partition { case (edge, idx) =>
-        val e = edge
-        e.innerLabel.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk")
-      }
-
-    val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.innerLabel.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) =>
-      val futures = edgeWithIdxs.groupBy(_._1.innerLabel).map { case (label, edgeGroup) =>
-        val storage = getStorage(label)
-        val edges = edgeGroup.map(_._1)
-        val idxs = edgeGroup.map(_._2)
-
-        /** multiple edges with weak consistency level will be processed as batch */
-        val mutations = edges.flatMap { edge =>
-          val (_, edgeUpdate) =
-            if (edge.op == GraphUtil.operations("delete")) Edge.buildDeleteBulk(None, edge)
-            else Edge.buildOperation(None, Seq(edge))
-
-          storage.buildVertexPutsAsync(edge) ++ storage.indexedEdgeMutations(edgeUpdate) ++ storage.snapshotEdgeMutations(edgeUpdate) ++ storage.increments(edgeUpdate)
-        }
-
-        storage.writeToStorage(zkQuorum, mutations, withWait).map { ret =>
-          idxs.map(idx => idx -> ret)
-        }
-      }
-      Future.sequence(futures)
-    }
-    val (strongDeleteAll, strongEdgesAll) = strongEdges.partition { case (edge, idx) => edge.op == GraphUtil.operations("deleteAll") }
-
-    val deleteAllFutures = strongDeleteAll.map { case (edge, idx) =>
-      deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.innerLabel), edge.labelWithDir.dir, edge.ts).map(idx -> _)
-    }
-
-    val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => edge.innerLabel }.map { case (label, edgeGroup) =>
-      val edges = edgeGroup.map(_._1)
-      val idxs = edgeGroup.map(_._2)
-      val storage = getStorage(label)
-      storage.mutateStrongEdges(edges, withWait = true).map { rets =>
-        idxs.zip(rets)
-      }
-    }
-
-    for {
-      weak <- Future.sequence(weakEdgesFutures)
-      deleteAll <- Future.sequence(deleteAllFutures)
-      strong <- Future.sequence(strongEdgesFutures)
-    } yield {
-      (deleteAll ++ weak.flatten.flatten ++ strong.flatten).sortBy(_._1).map(_._2)
-    }
-  }
-
-  def mutateVertices(vertices: Seq[Vertex], withWait: Boolean = false): Future[Seq[Boolean]] = {
-    val verticesWithIdx = vertices.zipWithIndex
-    val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
-      getStorage(service).mutateVertices(vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2)))
-    }
-    Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) }
-  }
-
-  def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]] = {
-    val edgesWithIdx = edges.zipWithIndex
-    val futures = edgesWithIdx.groupBy { case (e, idx) => e.innerLabel }.map { case (label, edgeGroup) =>
-      getStorage(label).incrementCounts(edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2)))
-    }
-    Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) }
-  }
-
-  def updateDegree(edge: Edge, degreeVal: Long = 0): Future[Boolean] = {
-    val label = edge.innerLabel
-
-    val storage = getStorage(label)
-    val kvs = storage.buildDegreePuts(edge, degreeVal)
-
-    storage.writeToStorage(edge.innerLabel.service.cluster, kvs, withWait = true)
-  }
-
-  def shutdown(): Unit = {
-    flushStorage()
-    Model.shutdown()
-  }
-
-  def addEdge(srcId: Any,
-              tgtId: Any,
-              labelName: String,
-              direction: String = "out",
-              props: Map[String, Any] = Map.empty,
-              ts: Long = System.currentTimeMillis(),
-              operation: String = "insert",
-              withWait: Boolean = true): Future[Boolean] = {
-
-    val innerEdges = Seq(toEdge(srcId, tgtId, labelName, direction, props.toMap, ts, operation))
-    mutateEdges(innerEdges, withWait).map(_.headOption.getOrElse(false))
-  }
-
-  def addVertex(serviceName: String,
-                columnName: String,
-                id: Any,
-                props: Map[String, Any] = Map.empty,
-                ts: Long = System.currentTimeMillis(),
-                operation: String = "insert",
-                withWait: Boolean = true): Future[Boolean] = {
-    val innerVertices = Seq(toVertex(serviceName, columnName, id, props.toMap, ts, operation))
-    mutateVertices(innerVertices, withWait).map(_.headOption.getOrElse(false))
-  }
-
-  def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try {
-    val parts = GraphUtil.split(s)
-    val logType = parts(2)
-    val element = if (logType == "edge" | logType == "e") {
-      /** current only edge is considered to be bulk loaded */
-      labelMapping.get(parts(5)) match {
-        case None =>
-        case Some(toReplace) =>
-          parts(5) = toReplace
-      }
-      toEdge(parts)
-    } else if (logType == "vertex" | logType == "v") {
-      toVertex(parts)
-    } else {
-      throw new GraphExceptions.JsonParseException("log type is not exist in log.")
-    }
-
-    element
-  } recover {
-    case e: Exception =>
-      logger.error(s"[toElement]: $e", e)
-      None
-  } get
-
-
-  def toVertex(s: String): Option[Vertex] = {
-    toVertex(GraphUtil.split(s))
-  }
-
-  def toEdge(s: String): Option[Edge] = {
-    toEdge(GraphUtil.split(s))
-  }
-
-  def toEdge(parts: Array[String]): Option[Edge] = Try {
-    val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
-    val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
-    val tempDirection = if (parts.length >= 8) parts(7) else "out"
-    val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection
-    val edge = toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation)
-    Option(edge)
-  } recover {
-    case e: Exception =>
-      logger.error(s"[toEdge]: $e", e)
-      throw e
-  } get
-
-  def toVertex(parts: Array[String]): Option[Vertex] = Try {
-    val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
-    val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
-    val vertex = toVertex(serviceName, colName, srcId, props, ts.toLong, operation)
-    Option(vertex)
-  } recover {
-    case e: Throwable =>
-      logger.error(s"[toVertex]: $e", e)
-      throw e
-  } get
-
-
-  def newSnapshotEdge(srcVertex: Vertex,
-                      tgtVertex: Vertex,
-                      label: Label,
-                      dir: Int,
-                      op: Byte,
-                      version: Long,
-                      propsWithTs: Edge.State,
-                      pendingEdgeOpt: Option[Edge],
-                      statusCode: Byte = 0,
-                      lockTs: Option[Long],
-                      tsInnerValOpt: Option[InnerValLike] = None): SnapshotEdge = {
-    val snapshotEdge = new SnapshotEdge(this, srcVertex, tgtVertex, label, dir, op, version, Edge.EmptyProps,
-      pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
-    Edge.fillPropsWithTs(snapshotEdge, propsWithTs)
-    snapshotEdge
-  }
-
-  def newEdge(srcVertex: Vertex,
-              tgtVertex: Vertex,
-              innerLabel: Label,
-              dir: Int,
-              op: Byte = GraphUtil.defaultOpByte,
-              version: Long = System.currentTimeMillis(),
-              propsWithTs: Edge.State,
-              parentEdges: Seq[EdgeWithScore] = Nil,
-              originalEdgeOpt: Option[Edge] = None,
-              pendingEdgeOpt: Option[Edge] = None,
-              statusCode: Byte = 0,
-              lockTs: Option[Long] = None,
-              tsInnerValOpt: Option[InnerValLike] = None): Edge = {
-    val edge = new Edge(this, srcVertex, tgtVertex, innerLabel, dir, op, version, Edge.EmptyProps,
-      parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
-    Edge.fillPropsWithTs(edge, propsWithTs)
-    edge
-  }
-  def toEdge(srcId: Any,
-             tgtId: Any,
-             labelName: String,
-             direction: String,
-             props: Map[String, Any] = Map.empty,
-             ts: Long = System.currentTimeMillis(),
-             operation: String = "insert"): Edge = {
-    val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
-
-    val srcVertexId = toInnerVal(srcId.toString, label.srcColumn.columnType, label.schemaVersion)
-    val tgtVertexId = toInnerVal(tgtId.toString, label.tgtColumn.columnType, label.schemaVersion)
-
-    val srcColId = label.srcColumn.id.get
-    val tgtColId = label.tgtColumn.id.get
-
-    val srcVertex = newVertex(SourceVertexId(label.srcColumn, srcVertexId), System.currentTimeMillis())
-    val tgtVertex = newVertex(TargetVertexId(label.tgtColumn, tgtVertexId), System.currentTimeMillis())
-    val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
-
-    val labelWithDir = LabelWithDirection(label.id.get, dir)
-    val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts)
-    val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
-    val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
-
-    new Edge(this, srcVertex, tgtVertex, label, dir, op = op, version = ts).copyEdgeWithState(propsWithTs)
-  }
-
-  def newVertex(id: VertexId,
-                ts: Long = System.currentTimeMillis(),
-                props: Map[Int, InnerValLike] = Map.empty[Int, InnerValLike],
-                op: Byte = 0,
-                belongLabelIds: Seq[Int] = Seq.empty): Vertex = {
-    new Vertex(this, id, ts, props, op, belongLabelIds)
-  }
-  def toVertex(serviceName: String,
-               columnName: String,
-               id: Any,
-               props: Map[String, Any] = Map.empty,
-               ts: Long = System.currentTimeMillis(),
-               operation: String = "insert"): Vertex = {
-
-    val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found."))
-    val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found."))
-    val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
-
-    val srcVertexId = VertexId(column, toInnerVal(id.toString, column.columnType, column.schemaVersion))
-    val propsInner = column.propsToInnerVals(props) ++
-      Map(ColumnMeta.timeStampSeq.toInt -> InnerVal.withLong(ts, column.schemaVersion))
-
-    new Vertex(this, srcVertexId, ts, propsInner, op)
-  }
-
-  override def vertices(objects: AnyRef*): util.Iterator[structure.Vertex] = ???
-
-  override def tx(): Transaction = ???
-
-  override def edges(objects: AnyRef*): util.Iterator[structure.Edge] = ???
-
-  override def variables(): Variables = ???
-
-  override def configuration(): Configuration = ???
-
-  override def addVertex(objects: AnyRef*): structure.Vertex = ???
-
-  override def close(): Unit = ???
-
-  override def compute[C <: GraphComputer](aClass: Class[C]): C = ???
-
-  override def compute(): GraphComputer = ???
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index 60900be..064a3d1 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -256,7 +256,7 @@ object Management {
   }
 }
 
-class Management(graph: Graph) {
+class Management(graph: S2Graph) {
   import Management._
 
   def createStorageTable(zkAddr: String,

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
index 083159f..b22eb65 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
@@ -53,7 +53,7 @@ object PostProcess {
     case _ => Json.obj("message" -> ex.getMessage)
   }
 
-  def s2EdgeParent(graph: Graph,
+  def s2EdgeParent(graph: S2Graph,
                    queryOption: QueryOption,
                    parentEdges: Seq[EdgeWithScore]): JsValue = {
     if (parentEdges.isEmpty) JsNull
@@ -141,7 +141,7 @@ object PostProcess {
     }
   }
 
-  def s2VertexToJson(s2Vertex: Vertex): Option[JsValue] = {
+  def s2VertexToJson(s2Vertex: S2Vertex): Option[JsValue] = {
     val props = for {
       (k, v) <- s2Vertex.properties
       jsVal <- anyValToJsValue(v)
@@ -160,7 +160,7 @@ object PostProcess {
     }
   }
 
-  def verticesToJson(s2Vertices: Seq[Vertex]): JsValue =
+  def verticesToJson(s2Vertices: Seq[S2Vertex]): JsValue =
     Json.toJson(s2Vertices.flatMap(s2VertexToJson(_)))
 
   def withOptionalFields(queryOption: QueryOption,
@@ -189,7 +189,7 @@ object PostProcess {
     case _ => js
   }
 
-  def toJson(orgQuery: Option[JsValue])(graph: Graph,
+  def toJson(orgQuery: Option[JsValue])(graph: S2Graph,
                                         queryOption: QueryOption,
                                         stepResult: StepResult): JsValue = {
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
index 170fd0b..eb36258 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -27,7 +27,7 @@ import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
 import org.apache.s2graph.core.parsers.{Where, WhereParser}
 import org.apache.s2graph.core.rest.TemplateHelper
 import org.apache.s2graph.core.storage.StorageSerializable._
-import org.apache.s2graph.core.types.{InnerVal, InnerValLike, InnerValLikeWithTs, LabelWithDirection}
+import org.apache.s2graph.core.types._
 import org.hbase.async.ColumnRangeFilter
 import play.api.libs.json.{JsString, JsNull, JsValue, Json}
 
@@ -39,7 +39,7 @@ object Query {
   def apply(query: Query): Query = {
     Query(query.vertices, query.steps, query.queryOption, query.jsonQuery)
   }
-  def toQuery(srcVertices: Seq[Vertex], queryParam: QueryParam) = Query(srcVertices, Vector(Step(List(queryParam))))
+  def toQuery(srcVertices: Seq[S2Vertex], queryParams: Seq[QueryParam]) = Query(srcVertices, Vector(Step(queryParams)))
 
 }
 
@@ -96,7 +96,7 @@ case class QueryOption(removeCycle: Boolean = false,
 
 }
 
-case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex],
+case class Query(vertices: Seq[S2Vertex] = Seq.empty[S2Vertex],
                  steps: IndexedSeq[Step] = Vector.empty[Step],
                  queryOption: QueryOption = QueryOption(),
                  jsonQuery: JsValue = JsNull) {
@@ -162,7 +162,7 @@ case class EdgeTransformer(jsValue: JsValue) {
     }
   }
 
-  def toInnerValOpt(queryParam: QueryParam, edge: Edge, fieldName: String): Option[InnerValLike] = {
+  def toInnerValOpt(queryParam: QueryParam, edge: S2Edge, fieldName: String): Option[InnerValLike] = {
     fieldName match {
       case LabelMeta.to.name => Option(edge.tgtVertex.innerId)
       case LabelMeta.from.name => Option(edge.srcVertex.innerId)
@@ -170,7 +170,7 @@ case class EdgeTransformer(jsValue: JsValue) {
     }
   }
 
-  def transform(queryParam: QueryParam, edge: Edge, nextStepOpt: Option[Step]): Seq[Edge] = {
+  def transform(queryParam: QueryParam, edge: S2Edge, nextStepOpt: Option[Step]): Seq[S2Edge] = {
     if (isDefault) Seq(edge)
     else {
       val edges = for {
@@ -218,7 +218,7 @@ case class Step(queryParams: Seq[QueryParam],
   }
 }
 
-case class VertexParam(vertices: Seq[Vertex]) {
+case class VertexParam(vertices: Seq[S2Vertex]) {
   var filters: Option[Map[Byte, InnerValLike]] = None
 
   def has(what: Option[Map[Byte, InnerValLike]]): VertexParam = {
@@ -306,11 +306,10 @@ case class QueryParam(labelName: String,
     else label.indexNameMap.getOrElse(indexName, throw new RuntimeException(s"$indexName indexName is not found.")).seq
 
   lazy val tgtVertexInnerIdOpt = tgtVertexIdOpt.map { id =>
-    val tmp = label.tgtColumnWithDir(dir)
-    toInnerVal(id, tmp.columnType, tmp.schemaVersion)
+    CanInnerValLike.anyToInnerValLike.toInnerVal(id)(label.tgtColumnWithDir(dir).schemaVersion)
   }
 
-  def buildInterval(edgeOpt: Option[Edge]) = intervalOpt match {
+  def buildInterval(edgeOpt: Option[S2Edge]) = intervalOpt match {
     case None => Array.empty[Byte] -> Array.empty[Byte]
     case Some(interval) =>
       val (froms, tos) = interval
@@ -358,7 +357,7 @@ case class QueryParam(labelName: String,
     Bytes.add(bytes, optionalCacheKey)
   }
 
-  private def convertToInner(kvs: Seq[(String, JsValue)], edgeOpt: Option[Edge]): Seq[(LabelMeta, InnerValLike)] = {
+  private def convertToInner(kvs: Seq[(String, JsValue)], edgeOpt: Option[S2Edge]): Seq[(LabelMeta, InnerValLike)] = {
     kvs.map { case (propKey, propValJs) =>
       propValJs match {
         case JsString(in) if edgeOpt.isDefined && in.contains("_parent.") =>
@@ -376,7 +375,7 @@ case class QueryParam(labelName: String,
 
           val propVal =
             if (InnerVal.isNumericType(labelMeta.dataType)) {
-              InnerVal.withLong(edge.property(labelMeta.name).value.asInstanceOf[BigDecimal].longValue() + padding, label.schemaVersion)
+              InnerVal.withLong(edge.property(labelMeta.name).value.toString.toLong + padding, label.schemaVersion)
             } else {
               edge.property(labelMeta.name).asInstanceOf[S2Property[_]].innerVal
             }
@@ -391,7 +390,7 @@ case class QueryParam(labelName: String,
     }
   }
 
-  def paddingInterval(len: Byte, froms: Seq[(String, JsValue)], tos: Seq[(String, JsValue)], edgeOpt: Option[Edge] = None) = {
+  def paddingInterval(len: Byte, froms: Seq[(String, JsValue)], tos: Seq[(String, JsValue)], edgeOpt: Option[S2Edge] = None) = {
     val fromInnerVal = convertToInner(froms, edgeOpt)
     val toInnerVal = convertToInner(tos, edgeOpt)
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
index 3753d0f..bad8361 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
@@ -27,7 +27,7 @@ import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.collection.{Seq, mutable}
 
 object QueryResult {
-  def fromVertices(graph: Graph,
+  def fromVertices(graph: S2Graph,
                    query: Query): StepResult = {
     if (query.steps.isEmpty || query.steps.head.queryParams.isEmpty) {
       StepResult.Empty
@@ -41,7 +41,7 @@ object QueryResult {
         vertex <- query.vertices
       } yield {
           val edge = graph.newEdge(vertex, vertex, label, queryParam.labelWithDir.dir, propsWithTs = propsWithTs)
-          val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore, queryParam.label)
+          val edgeWithScore = EdgeWithScore(edge, S2Graph.DefaultScore, queryParam.label)
           edgeWithScore
         }
       StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = Nil, false)
@@ -51,7 +51,7 @@ object QueryResult {
 
 case class QueryRequest(query: Query,
                         stepIdx: Int,
-                        vertex: Vertex,
+                        vertex: S2Vertex,
                         queryParam: QueryParam,
                         prevStepScore: Double = 1.0,
                         labelWeight: Double = 1.0) {
@@ -73,7 +73,7 @@ object WithScore {
   }
 }
 
-case class EdgeWithScore(edge: Edge,
+case class EdgeWithScore(edge: S2Edge,
                          score: Double,
                          label: Label,
                          orderByValues: (Any, Any, Any, Any) = StepResult.EmptyOrderByValues,
@@ -283,7 +283,7 @@ object StepResult {
   }
 
   //TODO: Optimize this.
-  def filterOut(graph: Graph,
+  def filterOut(graph: S2Graph,
                 queryOption: QueryOption,
                 baseStepResult: StepResult,
                 filterOutStepResult: StepResult): StepResult = {