You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/12/01 08:30:12 UTC

[02/10] incubator-s2graph git commit: [S2GRAPH-131]: Add actual implementation on interfaces from TinkerPop3 structure package. - Change core.Edge/Vertex/Graph to core.S2Edge/S2Vertex/S2Graph. - Implement base interfaces for tinkerpop3 structure packag

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
new file mode 100644
index 0000000..51a80f9
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -0,0 +1,1397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core
+
+import java.util
+import java.util.concurrent.{Executors, TimeUnit}
+
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.commons.configuration.Configuration
+import org.apache.s2graph.core.GraphExceptions.{FetchTimeoutException, LabelNotExistException}
+import org.apache.s2graph.core.JSONParser._
+import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
+import org.apache.s2graph.core.storage.{SKeyValue, Storage}
+import org.apache.s2graph.core.types._
+import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger}
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer
+import org.apache.tinkerpop.gremlin.structure
+import org.apache.tinkerpop.gremlin.structure.Graph.Variables
+import org.apache.tinkerpop.gremlin.structure.util.ElementHelper
+import org.apache.tinkerpop.gremlin.structure.{Edge, Graph, T, Transaction}
+import play.api.libs.json.{JsObject, Json}
+
+import scala.annotation.tailrec
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.concurrent._
+import scala.concurrent.duration.Duration
+import scala.util.{Random, Try}
+
+
+object S2Graph {
+
+  type HashKey = (Int, Int, Int, Int, Boolean)
+  type FilterHashKey = (Int, Int)
+
+
+  val DefaultScore = 1.0
+
+  private val DefaultConfigs: Map[String, AnyRef] = Map(
+    "hbase.zookeeper.quorum" -> "localhost",
+    "hbase.table.name" -> "s2graph",
+    "hbase.table.compression.algorithm" -> "gz",
+    "phase" -> "dev",
+    "db.default.driver" ->  "org.h2.Driver",
+    "db.default.url" -> "jdbc:h2:file:./var/metastore;MODE=MYSQL",
+    "db.default.password" -> "graph",
+    "db.default.user" -> "graph",
+    "cache.max.size" -> java.lang.Integer.valueOf(10000),
+    "cache.ttl.seconds" -> java.lang.Integer.valueOf(60),
+    "hbase.client.retries.number" -> java.lang.Integer.valueOf(20),
+    "hbase.rpcs.buffered_flush_interval" -> java.lang.Short.valueOf(100.toShort),
+    "hbase.rpc.timeout" -> java.lang.Integer.valueOf(1000),
+    "max.retry.number" -> java.lang.Integer.valueOf(100),
+    "lock.expire.time" -> java.lang.Integer.valueOf(1000 * 60 * 10),
+    "max.back.off" -> java.lang.Integer.valueOf(100),
+    "back.off.timeout" -> java.lang.Integer.valueOf(1000),
+    "hbase.fail.prob" -> java.lang.Double.valueOf(-0.1),
+    "delete.all.fetch.size" -> java.lang.Integer.valueOf(1000),
+    "delete.all.fetch.count" -> java.lang.Integer.valueOf(200),
+    "future.cache.max.size" -> java.lang.Integer.valueOf(100000),
+    "future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
+    "future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
+    "future.cache.metric.interval" -> java.lang.Integer.valueOf(60000),
+    "query.future.cache.max.size" -> java.lang.Integer.valueOf(1000),
+    "query.future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
+    "query.future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
+    "query.future.cache.metric.interval" -> java.lang.Integer.valueOf(60000),
+    "s2graph.storage.backend" -> "hbase",
+    "query.hardlimit" -> java.lang.Integer.valueOf(100000),
+    "hbase.zookeeper.znode.parent" -> "/hbase",
+    "query.log.sample.rate" -> Double.box(0.05)
+  )
+
+  var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs)
+
+
+
+  def initStorage(graph: S2Graph, config: Config)(ec: ExecutionContext): Storage[_, _] = {
+    val storageBackend = config.getString("s2graph.storage.backend")
+    logger.info(s"[InitStorage]: $storageBackend")
+
+    storageBackend match {
+      case "hbase" => new AsynchbaseStorage(graph, config)(ec)
+      case _ => throw new RuntimeException("not supported storage.")
+    }
+  }
+
+  def parseCacheConfig(config: Config, prefix: String): Config = {
+    import scala.collection.JavaConversions._
+
+    val kvs = new java.util.HashMap[String, AnyRef]()
+    for {
+      entry <- config.entrySet()
+      (k, v) = (entry.getKey, entry.getValue) if k.startsWith(prefix)
+    } yield {
+      val newKey = k.replace(prefix, "")
+      kvs.put(newKey, v.unwrapped())
+    }
+    ConfigFactory.parseMap(kvs)
+  }
+
+  /** Global helper functions */
+  @tailrec
+  final def randomInt(sampleNumber: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = {
+    if (range < sampleNumber || set.size == sampleNumber) set
+    else randomInt(sampleNumber, range, set + Random.nextInt(range))
+  }
+
+  def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = {
+    if (edges.size <= n) {
+      edges
+    } else {
+      val plainEdges = if (queryRequest.queryParam.offset == 0) {
+        edges.tail
+      } else edges
+
+      val randoms = randomInt(n, plainEdges.size)
+      var samples = List.empty[EdgeWithScore]
+      var idx = 0
+      plainEdges.foreach { e =>
+        if (randoms.contains(idx)) samples = e :: samples
+        idx += 1
+      }
+      samples
+    }
+  }
+
+  def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = {
+    val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score }
+    edgeWithScores.map { edgeWithScore =>
+      edgeWithScore.copy(score = edgeWithScore.score / sum)
+    }
+  }
+
+  def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): Map[(LabelWithDirection, S2Vertex), Boolean] = {
+    val vertices = for {
+      edgeWithScore <- edgeWithScoreLs
+      edge = edgeWithScore.edge
+      vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex
+    } yield (edge.labelWithDir, vertex) -> true
+
+    vertices.toMap
+  }
+
+  /** common methods for filter out, transform, aggregate queryResult */
+  def convertEdges(queryParam: QueryParam, edge: S2Edge, nextStepOpt: Option[Step]): Seq[S2Edge] = {
+    for {
+      convertedEdge <- queryParam.edgeTransformer.transform(queryParam, edge, nextStepOpt) if !edge.isDegree
+    } yield convertedEdge
+  }
+
+  def processTimeDecay(queryParam: QueryParam, edge: S2Edge) = {
+    /* process time decay */
+    val tsVal = queryParam.timeDecay match {
+      case None => 1.0
+      case Some(timeDecay) =>
+        val tsVal = try {
+          val innerValWithTsOpt = edge.propertyValue(timeDecay.labelMeta.name)
+          innerValWithTsOpt.map { innerValWithTs =>
+            val innerVal = innerValWithTs.innerVal
+            timeDecay.labelMeta.dataType match {
+              case InnerVal.LONG => innerVal.value match {
+                case n: BigDecimal => n.bigDecimal.longValue()
+                case _ => innerVal.toString().toLong
+              }
+              case _ => innerVal.toString().toLong
+            }
+          } getOrElse (edge.ts)
+        } catch {
+          case e: Exception =>
+            logger.error(s"processTimeDecay error. ${edge.toLogString}", e)
+            edge.ts
+        }
+        val timeDiff = queryParam.timestamp - tsVal
+        timeDecay.decay(timeDiff)
+    }
+
+    tsVal
+  }
+
+  def processDuplicates[T](queryParam: QueryParam,
+                           duplicates: Seq[(FilterHashKey, T)])(implicit ev: WithScore[T]): Seq[(FilterHashKey, T)] = {
+
+    if (queryParam.label.consistencyLevel != "strong") {
+      //TODO:
+      queryParam.duplicatePolicy match {
+        case DuplicatePolicy.First => Seq(duplicates.head)
+        case DuplicatePolicy.Raw => duplicates
+        case DuplicatePolicy.CountSum =>
+          val countSum = duplicates.size
+          val (headFilterHashKey, headEdgeWithScore) = duplicates.head
+          Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, countSum))
+        case _ =>
+          val scoreSum = duplicates.foldLeft(0.0) { case (prev, current) => prev + ev.score(current._2) }
+          val (headFilterHashKey, headEdgeWithScore) = duplicates.head
+          Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, scoreSum))
+      }
+    } else {
+      duplicates
+    }
+  }
+
+  def toHashKey(queryParam: QueryParam, edge: S2Edge, isDegree: Boolean): (HashKey, FilterHashKey) = {
+    val src = edge.srcVertex.innerId.hashCode()
+    val tgt = edge.tgtVertex.innerId.hashCode()
+    val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree)
+    val filterHashKey = (src, tgt)
+
+    (hashKey, filterHashKey)
+  }
+
+  def filterEdges(q: Query,
+                  stepIdx: Int,
+                  queryRequests: Seq[QueryRequest],
+                  queryResultLsFuture: Future[Seq[StepResult]],
+                  queryParams: Seq[QueryParam],
+                  alreadyVisited: Map[(LabelWithDirection, S2Vertex), Boolean] = Map.empty,
+                  buildLastStepInnerResult: Boolean = true,
+                  parentEdges: Map[VertexId, Seq[EdgeWithScore]])
+                 (implicit ec: scala.concurrent.ExecutionContext): Future[StepResult] = {
+
+    queryResultLsFuture.map { queryRequestWithResultLs =>
+      val (cursors, failCount) = {
+        val _cursors = ArrayBuffer.empty[Array[Byte]]
+        var _failCount = 0
+
+        queryRequestWithResultLs.foreach { stepResult =>
+          _cursors.append(stepResult.cursors: _*)
+          _failCount += stepResult.failCount
+        }
+
+        _cursors -> _failCount
+      }
+
+
+      if (queryRequestWithResultLs.isEmpty) StepResult.Empty.copy(failCount = failCount)
+      else {
+        val isLastStep = stepIdx == q.steps.size - 1
+        val queryOption = q.queryOption
+        val step = q.steps(stepIdx)
+
+        val currentStepResults = queryRequests.view.zip(queryRequestWithResultLs)
+        val shouldBuildInnerResults = !isLastStep || buildLastStepInnerResult
+        val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges)
+
+        if (shouldBuildInnerResults) {
+          val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) =>
+            edgeWithScore
+          }
+
+          /** process step group by */
+          val results = StepResult.filterOutStepGroupBy(_results, step.groupBy)
+          StepResult(edgeWithScores = results, grouped = Nil, degreeEdges = degrees, cursors = cursors, failCount = failCount)
+
+        } else {
+          val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) =>
+            val edge = edgeWithScore.edge
+            val score = edgeWithScore.score
+            val label = edgeWithScore.label
+
+            /** Select */
+            val mergedPropsWithTs = edge.propertyValuesInner(propsSelectColumns)
+
+//            val newEdge = edge.copy(propsWithTs = mergedPropsWithTs)
+            val newEdge = edge.copyEdgeWithState(mergedPropsWithTs)
+
+            val newEdgeWithScore = edgeWithScore.copy(edge = newEdge)
+            /** OrderBy */
+            val orderByValues =
+             if (queryOption.orderByKeys.isEmpty) (score, edge.tsInnerVal, None, None)
+              else StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys))
+
+            /** StepGroupBy */
+            val stepGroupByValues = newEdgeWithScore.toValues(step.groupBy.keys)
+
+            /** GroupBy */
+            val groupByValues = newEdgeWithScore.toValues(queryOption.groupBy.keys)
+
+            /** FilterOut */
+            val filterOutValues = newEdgeWithScore.toValues(queryOption.filterOutFields)
+
+            newEdgeWithScore.copy(orderByValues = orderByValues,
+              stepGroupByValues = stepGroupByValues,
+              groupByValues = groupByValues,
+              filterOutValues = filterOutValues)
+          }
+
+          /** process step group by */
+          val results = StepResult.filterOutStepGroupBy(_results, step.groupBy)
+
+          /** process ordered list */
+          val ordered = if (queryOption.groupBy.keys.isEmpty) StepResult.orderBy(queryOption, results) else Nil
+
+          /** process grouped list */
+          val grouped =
+          if (queryOption.groupBy.keys.isEmpty) Nil
+          else {
+            val agg = new scala.collection.mutable.HashMap[StepResult.GroupByKey, (Double, StepResult.Values)]()
+            results.groupBy { edgeWithScore =>
+              //                edgeWithScore.groupByValues.map(_.map(_.toString))
+              edgeWithScore.groupByValues
+            }.foreach { case (k, ls) =>
+              val (scoreSum, merged) = StepResult.mergeOrdered(ls, Nil, queryOption)
+
+              val newScoreSum = scoreSum
+
+              /**
+                * watch out here. by calling toString on Any, we lose type information which will be used
+                * later for toJson.
+                */
+              if (merged.nonEmpty) {
+                val newKey = merged.head.groupByValues
+                agg += (newKey -> (newScoreSum, merged))
+              }
+            }
+            agg.toSeq.sortBy(_._2._1 * -1)
+          }
+
+          StepResult(edgeWithScores = ordered, grouped = grouped, degreeEdges = degrees, cursors = cursors, failCount = failCount)
+        }
+      }
+    }
+  }
+
+  private def toEdgeWithScores(queryRequest: QueryRequest,
+                               stepResult: StepResult,
+                               parentEdges: Map[VertexId, Seq[EdgeWithScore]]): Seq[EdgeWithScore] = {
+    val queryOption = queryRequest.query.queryOption
+    val queryParam = queryRequest.queryParam
+    val prevScore = queryRequest.prevStepScore
+    val labelWeight = queryRequest.labelWeight
+    val edgeWithScores = stepResult.edgeWithScores
+
+    val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent
+    val parents = if (shouldBuildParents) {
+      parentEdges.getOrElse(queryRequest.vertex.id, Nil).map { edgeWithScore =>
+        val edge = edgeWithScore.edge
+        val score = edgeWithScore.score
+        val label = edgeWithScore.label
+
+        /** Select */
+        val mergedPropsWithTs =
+          if (queryOption.selectColumns.isEmpty) {
+            edge.propertyValuesInner()
+          } else {
+            val initial = Map(LabelMeta.timestamp -> edge.propertyValueInner(LabelMeta.timestamp))
+            edge.propertyValues(queryOption.selectColumns) ++ initial
+          }
+
+        val newEdge = edge.copyEdgeWithState(mergedPropsWithTs)
+        edgeWithScore.copy(edge = newEdge)
+      }
+    } else Nil
+
+    // skip
+    if (queryOption.ignorePrevStepCache) stepResult.edgeWithScores
+    else {
+      val degreeScore = 0.0
+
+      val sampled =
+        if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
+        else edgeWithScores
+
+      val withScores = for {
+        edgeWithScore <- sampled
+      } yield {
+        val edge = edgeWithScore.edge
+        val edgeScore = edgeWithScore.score
+        val score = queryParam.scorePropagateOp match {
+          case "plus" => edgeScore + prevScore
+          case "divide" =>
+            if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0
+            else edgeScore / (prevScore + queryParam.scorePropagateShrinkage)
+          case _ => edgeScore * prevScore
+        }
+
+        val tsVal = processTimeDecay(queryParam, edge)
+        val newScore = degreeScore + score
+        //          val newEdge = if (queryOption.returnTree) edge.copy(parentEdges = parents) else edge
+        val newEdge = edge.copy(parentEdges = parents)
+        edgeWithScore.copy(edge = newEdge, score = newScore * labelWeight * tsVal)
+      }
+
+      val normalized =
+        if (queryParam.shouldNormalize) normalize(withScores)
+        else withScores
+
+      normalized
+    }
+  }
+
+  private def buildResult[T](query: Query,
+                             stepIdx: Int,
+                             stepResultLs: Seq[(QueryRequest, StepResult)],
+                             parentEdges: Map[VertexId, Seq[EdgeWithScore]])
+                            (createFunc: (EdgeWithScore, Seq[LabelMeta]) => T)
+                            (implicit ev: WithScore[T]): ListBuffer[T] = {
+    import scala.collection._
+
+    val results = ListBuffer.empty[T]
+    val sequentialLs: ListBuffer[(HashKey, FilterHashKey, T, QueryParam)] = ListBuffer.empty
+    val duplicates: mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, T)]] = mutable.HashMap.empty
+    val edgesToExclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty
+    val edgesToInclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty
+
+    var numOfDuplicates = 0
+    val queryOption = query.queryOption
+    val step = query.steps(stepIdx)
+    val excludeLabelWithDirSet = step.queryParams.filter(_.exclude).map(l => l.labelWithDir).toSet
+    val includeLabelWithDirSet = step.queryParams.filter(_.include).map(l => l.labelWithDir).toSet
+
+    stepResultLs.foreach { case (queryRequest, stepInnerResult) =>
+      val queryParam = queryRequest.queryParam
+      val label = queryParam.label
+      val shouldBeExcluded = excludeLabelWithDirSet.contains(queryParam.labelWithDir)
+      val shouldBeIncluded = includeLabelWithDirSet.contains(queryParam.labelWithDir)
+
+      val propsSelectColumns = (for {
+        column <- queryOption.propsSelectColumns
+        labelMeta <- label.metaPropsInvMap.get(column)
+      } yield labelMeta)
+
+      for {
+        edgeWithScore <- toEdgeWithScores(queryRequest, stepInnerResult, parentEdges)
+      } {
+        val edge = edgeWithScore.edge
+        val (hashKey, filterHashKey) = toHashKey(queryParam, edge, isDegree = false)
+        //        params += (hashKey -> queryParam) //
+
+        /** check if this edge should be exlcuded. */
+        if (shouldBeExcluded) {
+          edgesToExclude.add(filterHashKey)
+        } else {
+          if (shouldBeIncluded) {
+            edgesToInclude.add(filterHashKey)
+          }
+          val newEdgeWithScore = createFunc(edgeWithScore, propsSelectColumns)
+
+          sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore, queryParam))
+          duplicates.get(hashKey) match {
+            case None =>
+              val newLs = ListBuffer.empty[(FilterHashKey, T)]
+              newLs += (filterHashKey -> newEdgeWithScore)
+              duplicates += (hashKey -> newLs) //
+            case Some(old) =>
+              numOfDuplicates += 1
+              old += (filterHashKey -> newEdgeWithScore) //
+          }
+        }
+      }
+    }
+
+
+    if (numOfDuplicates == 0) {
+      // no duplicates at all.
+      for {
+        (hashKey, filterHashKey, edgeWithScore, _) <- sequentialLs
+        if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
+      } {
+        results += edgeWithScore
+      }
+    } else {
+      // need to resolve duplicates.
+      val seen = new mutable.HashSet[HashKey]()
+      for {
+        (hashKey, filterHashKey, edgeWithScore, queryParam) <- sequentialLs
+        if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
+        if !seen.contains(hashKey)
+      } {
+        //        val queryParam = params(hashKey)
+        processDuplicates(queryParam, duplicates(hashKey)).foreach { case (_, duplicate) =>
+          if (ev.score(duplicate) >= queryParam.threshold) {
+            seen += hashKey
+            results += duplicate
+          }
+        }
+      }
+    }
+    results
+  }
+
+}
+
+class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph {
+
+  import S2Graph._
+
+  val config = _config.withFallback(S2Graph.DefaultConfig)
+
+  Model.apply(config)
+  Model.loadCache()
+
+  val MaxRetryNum = config.getInt("max.retry.number")
+  val MaxBackOff = config.getInt("max.back.off")
+  val BackoffTimeout = config.getInt("back.off.timeout")
+  val DeleteAllFetchCount = config.getInt("delete.all.fetch.count")
+  val DeleteAllFetchSize = config.getInt("delete.all.fetch.size")
+  val FailProb = config.getDouble("hbase.fail.prob")
+  val LockExpireDuration = config.getInt("lock.expire.time")
+  val MaxSize = config.getInt("future.cache.max.size")
+  val ExpireAfterWrite = config.getInt("future.cache.expire.after.write")
+  val ExpireAfterAccess = config.getInt("future.cache.expire.after.access")
+  val WaitTimeout = Duration(60, TimeUnit.SECONDS)
+  val scheduledEx = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
+
+  private def confWithFallback(conf: Config): Config = {
+    conf.withFallback(config)
+  }
+
+  /**
+    * TODO: we need to some way to handle malformed configuration for storage.
+    */
+  val storagePool: scala.collection.mutable.Map[String, Storage[_, _]] = {
+    val labels = Label.findAll()
+    val services = Service.findAll()
+
+    val labelConfigs = labels.flatMap(_.toStorageConfig)
+    val serviceConfigs = services.flatMap(_.toStorageConfig)
+
+    val configs = (labelConfigs ++ serviceConfigs).map { conf =>
+      confWithFallback(conf)
+    }.toSet
+
+    val pools = new java.util.HashMap[Config, Storage[_, _]]()
+    configs.foreach { config =>
+      pools.put(config, S2Graph.initStorage(this, config)(ec))
+    }
+
+    val m = new java.util.concurrent.ConcurrentHashMap[String, Storage[_, _]]()
+
+    labels.foreach { label =>
+      if (label.storageConfigOpt.isDefined) {
+        m += (s"label:${label.label}" -> pools(label.storageConfigOpt.get))
+      }
+    }
+
+    services.foreach { service =>
+      if (service.storageConfigOpt.isDefined) {
+        m += (s"service:${service.serviceName}" -> pools(service.storageConfigOpt.get))
+      }
+    }
+
+    m
+  }
+
+  val defaultStorage: Storage[_, _] = S2Graph.initStorage(this, config)(ec)
+
+  /** QueryLevel FutureCache */
+  val queryFutureCache = new DeferCache[StepResult, Promise, Future](parseCacheConfig(config, "query."), empty = StepResult.Empty)
+
+  for {
+    entry <- config.entrySet() if S2Graph.DefaultConfigs.contains(entry.getKey)
+    (k, v) = (entry.getKey, entry.getValue)
+  } logger.info(s"[Initialized]: $k, ${this.config.getAnyRef(k)}")
+
+  def getStorage(service: Service): Storage[_, _] = {
+    storagePool.getOrElse(s"service:${service.serviceName}", defaultStorage)
+  }
+
+  def getStorage(label: Label): Storage[_, _] = {
+    storagePool.getOrElse(s"label:${label.label}", defaultStorage)
+  }
+
+  def flushStorage(): Unit = {
+    storagePool.foreach { case (_, storage) =>
+
+      /** flush is blocking */
+      storage.flush()
+    }
+  }
+
+  def fallback = Future.successful(StepResult.Empty)
+
+  def checkEdges(edges: Seq[S2Edge]): Future[StepResult] = {
+    val futures = for {
+      edge <- edges
+    } yield {
+      fetchSnapshotEdge(edge).map { case (queryParam, edgeOpt, kvOpt) =>
+        edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0, queryParam.label))
+      }
+    }
+
+    Future.sequence(futures).map { edgeWithScoreLs =>
+      val edgeWithScores = edgeWithScoreLs.flatten
+      StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = Nil)
+    }
+  }
+
+  //  def checkEdges(edges: Seq[Edge]): Future[StepResult] = storage.checkEdges(edges)
+
+  def getEdges(q: Query): Future[StepResult] = {
+    Try {
+      if (q.steps.isEmpty) {
+        // TODO: this should be get vertex query.
+        fallback
+      } else {
+        val filterOutFuture = q.queryOption.filterOutQuery match {
+          case None => fallback
+          case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery)
+        }
+        for {
+          stepResult <- getEdgesStepInner(q)
+          filterOutInnerResult <- filterOutFuture
+        } yield {
+          if (filterOutInnerResult.isEmpty) stepResult
+          else StepResult.filterOut(this, q.queryOption, stepResult, filterOutInnerResult)
+        }
+      }
+    } recover {
+      case e: Exception =>
+        logger.error(s"getEdgesAsync: $e", e)
+        fallback
+    } get
+  }
+
+  def getEdgesStepInner(q: Query, buildLastStepInnerResult: Boolean = false): Future[StepResult] = {
+    Try {
+      if (q.steps.isEmpty) fallback
+      else {
+
+        val queryOption = q.queryOption
+        def fetch: Future[StepResult] = {
+          val startStepInnerResult = QueryResult.fromVertices(this, q)
+          q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) =>
+            for {
+              prevStepInnerResult <- prevStepInnerResultFuture
+              currentStepInnerResult <- fetchStep(q, stepIdx, prevStepInnerResult, buildLastStepInnerResult)
+            } yield {
+              currentStepInnerResult.copy(
+                accumulatedCursors = prevStepInnerResult.accumulatedCursors :+ currentStepInnerResult.cursors,
+                failCount = currentStepInnerResult.failCount + prevStepInnerResult.failCount
+              )
+            }
+          }
+        }
+
+        fetch
+      }
+    } recover {
+      case e: Exception =>
+        logger.error(s"getEdgesAsync: $e", e)
+        fallback
+    } get
+  }
+
+  def fetchStep(orgQuery: Query,
+                stepIdx: Int,
+                stepInnerResult: StepResult,
+                buildLastStepInnerResult: Boolean = false): Future[StepResult] = {
+    if (stepInnerResult.isEmpty) Future.successful(StepResult.Empty)
+    else {
+      val edgeWithScoreLs = stepInnerResult.edgeWithScores
+
+      val q = orgQuery
+      val queryOption = orgQuery.queryOption
+      val prevStepOpt = if (stepIdx > 0) Option(q.steps(stepIdx - 1)) else None
+      val prevStepThreshold = prevStepOpt.map(_.nextStepScoreThreshold).getOrElse(QueryParam.DefaultThreshold)
+      val prevStepLimit = prevStepOpt.map(_.nextStepLimit).getOrElse(-1)
+      val step = q.steps(stepIdx)
+
+     val alreadyVisited =
+        if (stepIdx == 0) Map.empty[(LabelWithDirection, S2Vertex), Boolean]
+        else alreadyVisitedVertices(stepInnerResult.edgeWithScores)
+
+      val initial = (Map.empty[S2Vertex, Double], Map.empty[S2Vertex, ArrayBuffer[EdgeWithScore]])
+      val (sums, grouped) = edgeWithScoreLs.foldLeft(initial) { case ((sum, group), edgeWithScore) =>
+        val key = edgeWithScore.edge.tgtVertex
+        val newScore = sum.getOrElse(key, 0.0) + edgeWithScore.score
+        val buffer = group.getOrElse(key, ArrayBuffer.empty[EdgeWithScore])
+        buffer += edgeWithScore
+        (sum + (key -> newScore), group + (key -> buffer))
+      }
+      val groupedByFiltered = sums.filter(t => t._2 >= prevStepThreshold)
+      val prevStepTgtVertexIdEdges = grouped.map(t => t._1.id -> t._2)
+
+      val nextStepSrcVertices = if (prevStepLimit >= 0) {
+        groupedByFiltered.toSeq.sortBy(-1 * _._2).take(prevStepLimit)
+      } else {
+        groupedByFiltered.toSeq
+      }
+
+      val queryRequests = for {
+        (vertex, prevStepScore) <- nextStepSrcVertices
+        queryParam <- step.queryParams
+      } yield {
+        val labelWeight = step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0)
+        val newPrevStepScore = if (queryOption.shouldPropagateScore) prevStepScore else 1.0
+        QueryRequest(q, stepIdx, vertex, queryParam, newPrevStepScore, labelWeight)
+      }
+
+      val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges)
+
+      filterEdges(orgQuery, stepIdx, queryRequests,
+        fetchedLs, orgQuery.steps(stepIdx).queryParams, alreadyVisited, buildLastStepInnerResult, prevStepTgtVertexIdEdges)(ec)
+    }
+  }
+
+
+  /**
+    * responsible to fire parallel fetch call into storage and create future that will return merged result.
+    *
+    * @param queryRequests
+    * @param prevStepEdges
+    * @return
+    */
+  def fetches(queryRequests: Seq[QueryRequest],
+              prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepResult]] = {
+
+    val reqWithIdxs = queryRequests.zipWithIndex
+    val requestsPerLabel = reqWithIdxs.groupBy(t => t._1.queryParam.label)
+    val aggFuture = requestsPerLabel.foldLeft(Future.successful(Map.empty[Int, StepResult])) { case (prevFuture, (label, reqWithIdxs)) =>
+      for {
+        prev <- prevFuture
+        cur <- getStorage(label).fetches(reqWithIdxs.map(_._1), prevStepEdges)
+      } yield {
+        prev ++ reqWithIdxs.map(_._2).zip(cur).toMap
+      }
+    }
+    aggFuture.map { agg => agg.toSeq.sortBy(_._1).map(_._2) }
+  }
+
+
+  def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = {
+    Try {
+      if (mq.queries.isEmpty) fallback
+      else {
+        val filterOutFuture = mq.queryOption.filterOutQuery match {
+          case None => fallback
+          case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery)
+        }
+
+        val multiQueryFutures = Future.sequence(mq.queries.map { query => getEdges(query) })
+        for {
+          multiQueryResults <- multiQueryFutures
+          filterOutInnerResult <- filterOutFuture
+        } yield {
+          StepResult.merges(mq.queryOption, multiQueryResults, mq.weights, filterOutInnerResult)
+        }
+      }
+    } recover {
+      case e: Exception =>
+        logger.error(s"getEdgesAsync: $e", e)
+        fallback
+    } get
+  }
+
+
+  def fetchSnapshotEdge(edge: S2Edge): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] = {
+    /** TODO: Fix this. currently fetchSnapshotEdge should not use future cache
+      * so use empty cacheKey.
+      * */
+    val queryParam = QueryParam(labelName = edge.innerLabel.label,
+      direction = GraphUtil.fromDirection(edge.labelWithDir.dir),
+      tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal),
+      cacheTTLInMillis = -1)
+    val q = Query.toQuery(Seq(edge.srcVertex), Seq(queryParam))
+    val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam)
+
+    val storage = getStorage(edge.innerLabel)
+    storage.fetchSnapshotEdgeKeyValues(queryRequest).map { kvs =>
+      val (edgeOpt, kvOpt) =
+        if (kvs.isEmpty) (None, None)
+        else {
+          val snapshotEdgeOpt = storage.toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil)
+          val _kvOpt = kvs.headOption
+          (snapshotEdgeOpt, _kvOpt)
+        }
+      (queryParam, edgeOpt, kvOpt)
+    } recoverWith { case ex: Throwable =>
+      logger.error(s"fetchQueryParam failed. fallback return.", ex)
+      throw FetchTimeoutException(s"${edge.toLogString}")
+    }
+  }
+
+  def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = {
+    val verticesWithIdx = vertices.zipWithIndex
+    val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
+      getStorage(service).getVertices(vertexGroup.map(_._1)).map(_.zip(vertexGroup.map(_._2)))
+    }
+
+    Future.sequence(futures).map { ls =>
+      ls.flatten.toSeq.sortBy(_._2).map(_._1)
+    }
+  }
+
+  /** mutate */
+  def deleteAllAdjacentEdges(srcVertices: Seq[S2Vertex],
+                             labels: Seq[Label],
+                             dir: Int,
+                             ts: Long): Future[Boolean] = {
+
+    val requestTs = ts
+    val vertices = srcVertices
+    /** create query per label */
+    val queries = for {
+      label <- labels
+    } yield {
+      val queryParam = QueryParam(labelName = label.label, direction = GraphUtil.fromDirection(dir),
+        offset = 0, limit = DeleteAllFetchSize, duplicatePolicy = DuplicatePolicy.Raw)
+      val step = Step(List(queryParam))
+      Query(vertices, Vector(step))
+    }
+
+    //    Extensions.retryOnSuccessWithBackoff(MaxRetryNum, Random.nextInt(MaxBackOff) + 1) {
+    val retryFuture = Extensions.retryOnSuccess(DeleteAllFetchCount) {
+      fetchAndDeleteAll(queries, requestTs)
+    } { case (allDeleted, deleteSuccess) =>
+      allDeleted && deleteSuccess
+    }.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess }
+
+    retryFuture onFailure {
+      case ex =>
+        logger.error(s"[Error]: deleteAllAdjacentEdges failed.")
+    }
+
+    retryFuture
+  }
+
+  def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = {
+    val future = for {
+      stepInnerResultLs <- Future.sequence(queries.map(getEdgesStepInner(_, true)))
+      (allDeleted, ret) <- deleteAllFetchedEdgesLs(stepInnerResultLs, requestTs)
+    } yield {
+      //        logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}")
+      (allDeleted, ret)
+    }
+
+    Extensions.retryOnFailure(MaxRetryNum) {
+      future
+    } {
+      logger.error(s"fetch and deleteAll failed.")
+      (true, false)
+    }
+
+  }
+
+  def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepResult],
+                              requestTs: Long): Future[(Boolean, Boolean)] = {
+    stepInnerResultLs.foreach { stepInnerResult =>
+      if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.")
+    }
+    val futures = for {
+      stepInnerResult <- stepInnerResultLs
+      deleteStepInnerResult = buildEdgesToDelete(stepInnerResult, requestTs)
+      if deleteStepInnerResult.edgeWithScores.nonEmpty
+    } yield {
+      val head = deleteStepInnerResult.edgeWithScores.head
+      val label = head.edge.innerLabel
+      val ret = label.schemaVersion match {
+        case HBaseType.VERSION3 | HBaseType.VERSION4 =>
+          if (label.consistencyLevel == "strong") {
+            /**
+              * read: snapshotEdge on queryResult = O(N)
+              * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge))
+              */
+            mutateEdges(deleteStepInnerResult.edgeWithScores.map(_.edge), withWait = true).map(_.forall(identity))
+          } else {
+            getStorage(label).deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum)
+          }
+        case _ =>
+
+          /**
+            * read: x
+            * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices)
+            */
+          getStorage(label).deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum)
+      }
+      ret
+    }
+
+    if (futures.isEmpty) {
+      // all deleted.
+      Future.successful(true -> true)
+    } else {
+      Future.sequence(futures).map { rets => false -> rets.forall(identity) }
+    }
+  }
+
+  def buildEdgesToDelete(stepInnerResult: StepResult, requestTs: Long): StepResult = {
+    val filtered = stepInnerResult.edgeWithScores.filter { edgeWithScore =>
+      (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree
+    }
+    if (filtered.isEmpty) StepResult.Empty
+    else {
+      val head = filtered.head
+      val label = head.edge.innerLabel
+      val edgeWithScoreLs = filtered.map { edgeWithScore =>
+          val edge = edgeWithScore.edge
+          val copiedEdge = label.consistencyLevel match {
+            case "strong" =>
+              edge.copyEdge(op = GraphUtil.operations("delete"),
+                version = requestTs, propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs)
+            case _ =>
+              edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs)
+          }
+//        val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match {
+//          case "strong" =>
+//            val edge = edgeWithScore.edge
+//            edge.property(LabelMeta.timestamp.name, requestTs)
+//            val _newPropsWithTs = edge.updatePropsWithTs()
+//
+//            (GraphUtil.operations("delete"), requestTs, _newPropsWithTs)
+//          case _ =>
+//            val oldEdge = edgeWithScore.edge
+//            (oldEdge.op, oldEdge.version, oldEdge.updatePropsWithTs())
+//        }
+//
+//        val copiedEdge =
+//          edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs)
+
+        val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
+        //      logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}")
+        edgeToDelete
+      }
+      //Degree edge?
+      StepResult(edgeWithScores = edgeWithScoreLs, grouped = Nil, degreeEdges = Nil, false)
+    }
+  }
+
+  //  def deleteAllAdjacentEdges(srcVertices: List[Vertex], labels: Seq[Label], dir: Int, ts: Long): Future[Boolean] =
+  //    storage.deleteAllAdjacentEdges(srcVertices, labels, dir, ts)
+
+  def mutateElements(elements: Seq[GraphElement],
+                     withWait: Boolean = false): Future[Seq[Boolean]] = {
+
+    val edgeBuffer = ArrayBuffer[(S2Edge, Int)]()
+    val vertexBuffer = ArrayBuffer[(S2Vertex, Int)]()
+
+    elements.zipWithIndex.foreach {
+      case (e: S2Edge, idx: Int) => edgeBuffer.append((e, idx))
+      case (v: S2Vertex, idx: Int) => vertexBuffer.append((v, idx))
+      case any@_ => logger.error(s"Unknown type: ${any}")
+    }
+
+    val edgeFutureWithIdx = mutateEdges(edgeBuffer.map(_._1), withWait).map { result =>
+      edgeBuffer.map(_._2).zip(result)
+    }
+
+    val vertexFutureWithIdx = mutateVertices(vertexBuffer.map(_._1), withWait).map { result =>
+      vertexBuffer.map(_._2).zip(result)
+    }
+
+    val graphFuture = for {
+      edgesMutated <- edgeFutureWithIdx
+      verticesMutated <- vertexFutureWithIdx
+    } yield (edgesMutated ++ verticesMutated).sortBy(_._1).map(_._2)
+
+    graphFuture
+
+  }
+
+  //  def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait)
+
+  def mutateEdges(edges: Seq[S2Edge], withWait: Boolean = false): Future[Seq[Boolean]] = {
+    val edgeWithIdxs = edges.zipWithIndex
+
+    val (strongEdges, weakEdges) =
+      edgeWithIdxs.partition { case (edge, idx) =>
+        val e = edge
+        e.innerLabel.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk")
+      }
+
+    val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.innerLabel.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) =>
+      val futures = edgeWithIdxs.groupBy(_._1.innerLabel).map { case (label, edgeGroup) =>
+        val storage = getStorage(label)
+        val edges = edgeGroup.map(_._1)
+        val idxs = edgeGroup.map(_._2)
+
+        /** multiple edges with weak consistency level will be processed as batch */
+        val mutations = edges.flatMap { edge =>
+          val (_, edgeUpdate) =
+            if (edge.op == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
+            else S2Edge.buildOperation(None, Seq(edge))
+
+          storage.buildVertexPutsAsync(edge) ++ storage.indexedEdgeMutations(edgeUpdate) ++ storage.snapshotEdgeMutations(edgeUpdate) ++ storage.increments(edgeUpdate)
+        }
+
+        storage.writeToStorage(zkQuorum, mutations, withWait).map { ret =>
+          idxs.map(idx => idx -> ret)
+        }
+      }
+      Future.sequence(futures)
+    }
+    val (strongDeleteAll, strongEdgesAll) = strongEdges.partition { case (edge, idx) => edge.op == GraphUtil.operations("deleteAll") }
+
+    val deleteAllFutures = strongDeleteAll.map { case (edge, idx) =>
+      deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.innerLabel), edge.labelWithDir.dir, edge.ts).map(idx -> _)
+    }
+
+    val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => edge.innerLabel }.map { case (label, edgeGroup) =>
+      val edges = edgeGroup.map(_._1)
+      val idxs = edgeGroup.map(_._2)
+      val storage = getStorage(label)
+      storage.mutateStrongEdges(edges, withWait = true).map { rets =>
+        idxs.zip(rets)
+      }
+    }
+
+    for {
+      weak <- Future.sequence(weakEdgesFutures)
+      deleteAll <- Future.sequence(deleteAllFutures)
+      strong <- Future.sequence(strongEdgesFutures)
+    } yield {
+      (deleteAll ++ weak.flatten.flatten ++ strong.flatten).sortBy(_._1).map(_._2)
+    }
+  }
+
+  def mutateVertices(vertices: Seq[S2Vertex], withWait: Boolean = false): Future[Seq[Boolean]] = {
+    val verticesWithIdx = vertices.zipWithIndex
+    val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
+      getStorage(service).mutateVertices(vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2)))
+    }
+    Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) }
+  }
+
+  def incrementCounts(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]] = {
+    val edgesWithIdx = edges.zipWithIndex
+    val futures = edgesWithIdx.groupBy { case (e, idx) => e.innerLabel }.map { case (label, edgeGroup) =>
+      getStorage(label).incrementCounts(edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2)))
+    }
+    Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) }
+  }
+
+  def updateDegree(edge: S2Edge, degreeVal: Long = 0): Future[Boolean] = {
+    val label = edge.innerLabel
+
+    val storage = getStorage(label)
+    val kvs = storage.buildDegreePuts(edge, degreeVal)
+
+    storage.writeToStorage(edge.innerLabel.service.cluster, kvs, withWait = true)
+  }
+
+  def shutdown(): Unit = {
+    flushStorage()
+    Model.shutdown()
+  }
+
+  def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try {
+    val parts = GraphUtil.split(s)
+    val logType = parts(2)
+    val element = if (logType == "edge" | logType == "e") {
+      /** current only edge is considered to be bulk loaded */
+      labelMapping.get(parts(5)) match {
+        case None =>
+        case Some(toReplace) =>
+          parts(5) = toReplace
+      }
+      toEdge(parts)
+    } else if (logType == "vertex" | logType == "v") {
+      toVertex(parts)
+    } else {
+      throw new GraphExceptions.JsonParseException("log type is not exist in log.")
+    }
+
+    element
+  } recover {
+    case e: Exception =>
+      logger.error(s"[toElement]: $e", e)
+      None
+  } get
+
+
+  def toVertex(s: String): Option[S2Vertex] = {
+    toVertex(GraphUtil.split(s))
+  }
+
+  def toEdge(s: String): Option[S2Edge] = {
+    toEdge(GraphUtil.split(s))
+  }
+
+  def toEdge(parts: Array[String]): Option[S2Edge] = Try {
+    val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
+    val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
+    val tempDirection = if (parts.length >= 8) parts(7) else "out"
+    val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection
+    val edge = toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation)
+    Option(edge)
+  } recover {
+    case e: Exception =>
+      logger.error(s"[toEdge]: $e", e)
+      throw e
+  } get
+
+  def toVertex(parts: Array[String]): Option[S2Vertex] = Try {
+    val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
+    val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
+    val vertex = toVertex(serviceName, colName, srcId, props, ts.toLong, operation)
+    Option(vertex)
+  } recover {
+    case e: Throwable =>
+      logger.error(s"[toVertex]: $e", e)
+      throw e
+  } get
+
+  def toEdge(srcId: Any,
+             tgtId: Any,
+             labelName: String,
+             direction: String,
+             props: Map[String, Any] = Map.empty,
+             ts: Long = System.currentTimeMillis(),
+             operation: String = "insert"): S2Edge = {
+    val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
+
+    val srcVertexIdInnerVal = toInnerVal(srcId.toString, label.srcColumn.columnType, label.schemaVersion)
+    val tgtVertexIdInnerVal = toInnerVal(tgtId.toString, label.tgtColumn.columnType, label.schemaVersion)
+
+    val srcVertex = newVertex(SourceVertexId(label.srcColumn, srcVertexIdInnerVal), System.currentTimeMillis())
+    val tgtVertex = newVertex(TargetVertexId(label.tgtColumn, tgtVertexIdInnerVal), System.currentTimeMillis())
+    val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
+
+    val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts)
+    val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
+    val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
+
+    new S2Edge(this, srcVertex, tgtVertex, label, dir, op = op, version = ts).copyEdgeWithState(propsWithTs)
+  }
+
+  def toVertex(serviceName: String,
+               columnName: String,
+               id: Any,
+               props: Map[String, Any] = Map.empty,
+               ts: Long = System.currentTimeMillis(),
+               operation: String = "insert"): S2Vertex = {
+
+    val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found."))
+    val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found."))
+    val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
+
+    val srcVertexId = VertexId(column, toInnerVal(id.toString, column.columnType, column.schemaVersion))
+    val propsInner = column.propsToInnerVals(props) ++
+      Map(ColumnMeta.timestamp -> InnerVal.withLong(ts, column.schemaVersion))
+
+    val vertex = new S2Vertex(this, srcVertexId, ts, S2Vertex.EmptyProps, op)
+    S2Vertex.fillPropsWithTs(vertex, propsInner)
+    vertex
+  }
+
+  /**
+   * helper to create new Edge instance from given parameters on memory(not actually stored in storage).
+   *
+   * Since we are using mutable map for property value(propsWithTs),
+   * we should make sure that reference for mutable map never be shared between multiple Edge instances.
+   * To guarantee this, we never create Edge directly, but rather use this helper which is available on S2Graph.
+   *
+   * Note that we are using following convention
+   * 1. `add*` for method that actually store instance into storage,
+   * 2. `new*` for method that only create instance on memory, but not store it into storage.
+   *
+   * @param srcVertex
+   * @param tgtVertex
+   * @param innerLabel
+   * @param dir
+   * @param op
+   * @param version
+   * @param propsWithTs
+   * @param parentEdges
+   * @param originalEdgeOpt
+   * @param pendingEdgeOpt
+   * @param statusCode
+   * @param lockTs
+   * @param tsInnerValOpt
+   * @return
+   */
+  def newEdge(srcVertex: S2Vertex,
+              tgtVertex: S2Vertex,
+              innerLabel: Label,
+              dir: Int,
+              op: Byte = GraphUtil.defaultOpByte,
+              version: Long = System.currentTimeMillis(),
+              propsWithTs: S2Edge.State,
+              parentEdges: Seq[EdgeWithScore] = Nil,
+              originalEdgeOpt: Option[S2Edge] = None,
+              pendingEdgeOpt: Option[S2Edge] = None,
+              statusCode: Byte = 0,
+              lockTs: Option[Long] = None,
+              tsInnerValOpt: Option[InnerValLike] = None): S2Edge = {
+    val edge = new S2Edge(this, srcVertex, tgtVertex, innerLabel, dir, op, version, S2Edge.EmptyProps,
+      parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
+    S2Edge.fillPropsWithTs(edge, propsWithTs)
+    edge
+  }
+
+  /**
+   * helper to create new SnapshotEdge instance from given parameters on memory(not actually stored in storage).
+   *
+   * Note that this is only available to S2Graph, not structure.Graph so only internal code should use this method.
+   * @param srcVertex
+   * @param tgtVertex
+   * @param label
+   * @param dir
+   * @param op
+   * @param version
+   * @param propsWithTs
+   * @param pendingEdgeOpt
+   * @param statusCode
+   * @param lockTs
+   * @param tsInnerValOpt
+   * @return
+   */
+  private[core] def newSnapshotEdge(srcVertex: S2Vertex,
+                                    tgtVertex: S2Vertex,
+                                    label: Label,
+                                    dir: Int,
+                                    op: Byte,
+                                    version: Long,
+                                    propsWithTs: S2Edge.State,
+                                    pendingEdgeOpt: Option[S2Edge],
+                                    statusCode: Byte = 0,
+                                    lockTs: Option[Long],
+                                    tsInnerValOpt: Option[InnerValLike] = None): SnapshotEdge = {
+    val snapshotEdge = new SnapshotEdge(this, srcVertex, tgtVertex, label, dir, op, version, S2Edge.EmptyProps,
+      pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
+    S2Edge.fillPropsWithTs(snapshotEdge, propsWithTs)
+    snapshotEdge
+  }
+
+  /**
+   * internal helper to actually store a single edge based on given peramters.
+   *
+   * Note that this is used from S2Vertex to implement blocking interface from Tp3.
+   * Once tp3 provide AsyncStep, then this can be changed to return Java's CompletableFuture.
+   *
+   * @param srcVertex
+   * @param tgtVertex
+   * @param labelName
+   * @param direction
+   * @param props
+   * @param ts
+   * @param operation
+   * @return
+   */
+  private[core] def addEdgeInner(srcVertex: S2Vertex,
+                                 tgtVertex: S2Vertex,
+                                 labelName: String,
+                                 direction: String = "out",
+                                 props: Map[String, AnyRef] = Map.empty,
+                                 ts: Long = System.currentTimeMillis(),
+                                 operation: String = "insert"): S2Edge = {
+    Await.result(addEdgeInnerAsync(srcVertex, tgtVertex, labelName, direction, props, ts, operation), WaitTimeout)
+  }
+
+  private[core] def addEdgeInnerAsync(srcVertex: S2Vertex,
+                                      tgtVertex: S2Vertex,
+                                      labelName: String,
+                                      direction: String = "out",
+                                      props: Map[String, AnyRef] = Map.empty,
+                                      ts: Long = System.currentTimeMillis(),
+                                      operation: String = "insert"): Future[S2Edge] = {
+    // Validations on input parameter
+    val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
+    val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
+//    if (srcVertex.id.column != label.srcColumnWithDir(dir)) throw new RuntimeException(s"srcVertex's column[${srcVertex.id.column}] is not matched to label's srcColumn[${label.srcColumnWithDir(dir)}")
+//    if (tgtVertex.id.column != label.tgtColumnWithDir(dir)) throw new RuntimeException(s"tgtVertex's column[${tgtVertex.id.column}] is not matched to label's tgtColumn[${label.tgtColumnWithDir(dir)}")
+
+    // Convert given Map[String, AnyRef] property into internal class.
+    val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts)
+    val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
+    val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
+
+    val edge = newEdge(srcVertex, tgtVertex, label, dir, op = op, version = ts, propsWithTs = propsWithTs)
+    // store edge into storage withWait option.
+    mutateEdges(Seq(edge), withWait = true).map { rets =>
+      if (!rets.headOption.getOrElse(false)) throw new RuntimeException("add edge failed.")
+      else edge
+    }
+  }
+
+
+  def newVertexId(serviceName: String)(columnName: String)(id: Any): VertexId = {
+    val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found."))
+    val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found."))
+    newVertexId(service, column, id)
+  }
+
+  /**
+   * helper to create S2Graph's internal VertexId instance with given parameters.
+   * @param service
+   * @param column
+   * @param id
+   * @return
+   */
+  def newVertexId(service: Service,
+                  column: ServiceColumn,
+                  id: Any): VertexId = {
+    val innerVal = CanInnerValLike.anyToInnerValLike.toInnerVal(id)(column.schemaVersion)
+    new VertexId(column, innerVal)
+  }
+
+  def newVertex(id: VertexId,
+                ts: Long = System.currentTimeMillis(),
+                props: S2Vertex.Props = S2Vertex.EmptyProps,
+                op: Byte = 0,
+                belongLabelIds: Seq[Int] = Seq.empty): S2Vertex = {
+    val vertex = new S2Vertex(this, id, ts, S2Vertex.EmptyProps, op, belongLabelIds)
+    S2Vertex.fillPropsWithTs(vertex, props)
+    vertex
+  }
+
+  def getVertex(vertexId: VertexId): Option[S2Vertex] = {
+    val v = newVertex(vertexId)
+    Await.result(getVertices(Seq(v)).map { vertices => vertices.headOption }, WaitTimeout)
+  }
+
+  def fetchEdges(vertex: S2Vertex, labelNames: Seq[String], direction: String = "out"): util.Iterator[Edge] = {
+    Await.result(fetchEdgesAsync(vertex, labelNames, direction), WaitTimeout)
+  }
+
+  def fetchEdgesAsync(vertex: S2Vertex, labelNames: Seq[String], direction: String = "out"): Future[util.Iterator[Edge]] = {
+    val queryParams = labelNames.map { l =>
+      QueryParam(labelName = l, direction = direction)
+    }
+    val query = Query.toQuery(Seq(vertex), queryParams)
+    getEdges(query).map { stepResult =>
+      val ls = new util.ArrayList[Edge]()
+      stepResult.edgeWithScores.foreach(es => ls.add(es.edge))
+      ls.iterator()
+    }
+  }
+
+  override def vertices(vertexIds: AnyRef*): util.Iterator[structure.Vertex] = {
+    val vertices = for {
+      vertexId <- vertexIds if vertexId.isInstanceOf[VertexId]
+    } yield newVertex(vertexId.asInstanceOf[VertexId])
+
+    val future = getVertices(vertices).map { vs =>
+      val ls = new util.ArrayList[structure.Vertex]()
+      ls.addAll(vs)
+      ls.iterator()
+    }
+    Await.result(future, WaitTimeout)
+  }
+
+  override def tx(): Transaction = ???
+
+  override def edges(objects: AnyRef*): util.Iterator[structure.Edge] = ???
+
+  override def variables(): Variables = ???
+
+  override def configuration(): Configuration = ???
+
+  override def addVertex(kvs: AnyRef*): structure.Vertex = {
+    val kvsMap = ElementHelper.asMap(kvs: _*).asScala.toMap
+    val id = kvsMap.getOrElse(T.id.toString, throw new RuntimeException("T.id is required."))
+    val serviceColumnNames = kvsMap.getOrElse(T.label.toString, throw new RuntimeException("ServiceName::ColumnName is required.")).toString
+    val names = serviceColumnNames.split(S2Vertex.VertexLabelDelimiter)
+    if (names.length != 2) throw new RuntimeException("malformed data on vertex label.")
+    val serviceName = names(0)
+    val columnName = names(1)
+
+    val vertex = toVertex(serviceName, columnName, id, kvsMap)
+    val future = mutateVertices(Seq(vertex), withWait = true).map { vs =>
+      if (vs.forall(identity)) vertex
+      else throw new RuntimeException("addVertex failed.")
+    }
+    Await.result(future, WaitTimeout)
+  }
+
+  def addVertex(id: VertexId,
+                ts: Long = System.currentTimeMillis(),
+                props: S2Vertex.Props = S2Vertex.EmptyProps,
+                op: Byte = 0,
+                belongLabelIds: Seq[Int] = Seq.empty): S2Vertex = {
+    val vertex = newVertex(id, ts, props, op, belongLabelIds)
+    val future = mutateVertices(Seq(vertex), withWait = true).map { rets =>
+      if (rets.forall(identity)) vertex
+      else throw new RuntimeException("addVertex failed.")
+    }
+    Await.result(future, WaitTimeout)
+  }
+
+  override def close(): Unit = {
+    shutdown()
+  }
+
+  override def compute[C <: GraphComputer](aClass: Class[C]): C = ???
+
+  override def compute(): GraphComputer = ???
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
index 46f5ecf..6a47e46 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
@@ -27,16 +27,18 @@ import org.apache.tinkerpop.gremlin.structure.{Property}
 import scala.util.hashing.MurmurHash3
 
 
-case class S2Property[V](element: Edge,
+case class S2Property[V](element: S2Edge,
                          labelMeta: LabelMeta,
                          key: String,
-                         value: V,
+                         v: V,
                          ts: Long) extends Property[V] {
 
   import CanInnerValLike._
   lazy val innerVal = anyToInnerValLike.toInnerVal(value)(element.innerLabel.schemaVersion)
   lazy val innerValWithTs = InnerValLikeWithTs(innerVal, ts)
 
+  val value = castValue(v, labelMeta.dataType).asInstanceOf[V]
+
   def bytes: Array[Byte] = {
     innerVal.bytes
   }
@@ -64,4 +66,5 @@ case class S2Property[V](element: Edge,
   override def toString(): String = {
     Map("labelMeta" -> labelMeta.toString, "key" -> key, "value" -> value, "ts" -> ts).toString
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
new file mode 100644
index 0000000..7fd2ac4
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core
+
+import java.util
+import java.util.function.{Consumer, BiConsumer}
+
+import org.apache.s2graph.core.S2Vertex.Props
+import org.apache.s2graph.core.mysqls.{ColumnMeta, LabelMeta, Service, ServiceColumn}
+import org.apache.s2graph.core.types._
+import org.apache.tinkerpop.gremlin.structure.VertexProperty.Cardinality
+import org.apache.tinkerpop.gremlin.structure.util.ElementHelper
+import org.apache.tinkerpop.gremlin.structure.{Direction, Vertex, Edge, VertexProperty}
+import play.api.libs.json.Json
+import scala.collection.JavaConverters._
+
+case class S2Vertex(graph: S2Graph,
+                  id: VertexId,
+                  ts: Long = System.currentTimeMillis(),
+                  props: Props = S2Vertex.EmptyProps,
+                  op: Byte = 0,
+                  belongLabelIds: Seq[Int] = Seq.empty) extends GraphElement with Vertex {
+
+  val innerId = id.innerId
+
+  val innerIdVal = innerId.value
+
+  lazy val properties = for {
+    (k, v) <- props.asScala
+  } yield v.columnMeta.name -> v.value
+
+  def schemaVer = serviceColumn.schemaVersion
+
+  def serviceColumn = ServiceColumn.findById(id.colId)
+
+  def columnName = serviceColumn.columnName
+
+  lazy val service = Service.findById(serviceColumn.serviceId)
+
+  lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, service.hTableName)
+
+  def defaultProps = {
+    val default = S2Vertex.EmptyProps
+    val newProps = new S2VertexProperty(this, ColumnMeta.lastModifiedAtColumn, ColumnMeta.lastModifiedAtColumn.name, ts)
+    default.put(ColumnMeta.lastModifiedAtColumn.name, newProps)
+    default
+  }
+
+  //  lazy val kvs = Graph.client.vertexSerializer(this).toKeyValues
+
+  /** TODO: make this as configurable */
+  override def serviceName = service.serviceName
+
+  override def isAsync = false
+
+  override def queueKey = Seq(ts.toString, serviceName).mkString("|")
+
+  override def queuePartitionKey = id.innerId.toString
+
+  def propsWithName = for {
+    (k, v) <- props.asScala
+  } yield (v.columnMeta.name -> v.value.toString)
+
+  override def hashCode() = {
+    val hash = id.hashCode()
+    //    logger.debug(s"Vertex.hashCode: $this -> $hash")
+    hash
+  }
+
+  override def equals(obj: Any) = {
+    obj match {
+      case otherVertex: S2Vertex =>
+        val ret = id == otherVertex.id
+        //        logger.debug(s"Vertex.equals: $this, $obj => $ret")
+        ret
+      case _ => false
+    }
+  }
+
+  override def toString(): String = {
+    Map("id" -> id.toString(), "ts" -> ts, "props" -> "", "op" -> op, "belongLabelIds" -> belongLabelIds).toString()
+  }
+
+  def toLogString(): String = {
+    val (serviceName, columnName) =
+      if (!id.storeColId) ("", "")
+      else (serviceColumn.service.serviceName, serviceColumn.columnName)
+
+    if (propsWithName.nonEmpty)
+      Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, columnName, Json.toJson(propsWithName)).mkString("\t")
+    else
+      Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, columnName).mkString("\t")
+  }
+
+  def copyVertexWithState(props: Props): S2Vertex = {
+    val newVertex = copy(props = S2Vertex.EmptyProps)
+    S2Vertex.fillPropsWithTs(newVertex, props)
+    newVertex
+  }
+
+  override def vertices(direction: Direction, edgeLabels: String*): util.Iterator[Vertex] = {
+    val arr = new util.ArrayList[Vertex]()
+    edges(direction, edgeLabels: _*).forEachRemaining(new Consumer[Edge] {
+      override def accept(edge: Edge): Unit = {
+        direction match {
+          case Direction.OUT => arr.add(edge.inVertex())
+          case Direction.IN => arr.add(edge.outVertex())
+          case _ =>
+            arr.add(edge.inVertex())
+            arr.add(edge.outVertex())
+        }
+      }
+    })
+    arr.iterator()
+  }
+
+  override def edges(direction: Direction, labelNames: String*): util.Iterator[Edge] = {
+    graph.fetchEdges(this, labelNames, direction.name())
+  }
+
+  override def property[V](cardinality: Cardinality, key: String, value: V, objects: AnyRef*): VertexProperty[V] = {
+    cardinality match {
+      case Cardinality.single =>
+        val columnMeta = serviceColumn.metasInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Vertex."))
+        val newProps = new S2VertexProperty[V](this, columnMeta, key, value)
+        props.put(key, newProps)
+        newProps
+      case _ => throw new RuntimeException("only single cardinality is supported.")
+    }
+  }
+
+  override def addEdge(label: String, vertex: Vertex, kvs: AnyRef*): S2Edge = {
+    vertex match {
+      case otherV: S2Vertex =>
+        val props = ElementHelper.asMap(kvs: _*).asScala.toMap
+        //TODO: direction, operation, _timestamp need to be reserved property key.
+        val direction = props.get("direction").getOrElse("out").toString
+        val ts = props.get(LabelMeta.timestamp.name).map(_.toString.toLong).getOrElse(System.currentTimeMillis())
+        val operation = props.get("operation").map(_.toString).getOrElse("insert")
+
+        graph.addEdgeInner(this, otherV, label, direction, props, ts, operation)
+      case _ => throw new RuntimeException("only S2Graph vertex can be used.")
+    }
+  }
+
+  override def property[V](key: String): VertexProperty[V] = {
+    props.get(key).asInstanceOf[S2VertexProperty[V]]
+  }
+
+  override def properties[V](keys: String*): util.Iterator[VertexProperty[V]] = {
+    val ls = for {
+      key <- keys
+    } yield {
+      property[V](key)
+    }
+    ls.iterator.asJava
+  }
+
+  override def remove(): Unit = ???
+
+  override def label(): String = service.serviceName + S2Vertex.VertexLabelDelimiter + serviceColumn.columnName
+}
+
+object S2Vertex {
+
+  val VertexLabelDelimiter = "::"
+
+  type Props = java.util.Map[String, S2VertexProperty[_]]
+  type State = Map[ColumnMeta, InnerValLike]
+  def EmptyProps = new java.util.HashMap[String, S2VertexProperty[_]]()
+  def EmptyState = Map.empty[ColumnMeta, InnerValLike]
+
+  def toPropKey(labelId: Int): Int = Byte.MaxValue + labelId
+
+  def toLabelId(propKey: Int): Int = propKey - Byte.MaxValue
+
+  def isLabelId(propKey: Int): Boolean = propKey > Byte.MaxValue
+
+  def fillPropsWithTs(vertex: S2Vertex, props: Props): Unit = {
+    props.forEach(new BiConsumer[String, S2VertexProperty[_]] {
+      override def accept(key: String, p: S2VertexProperty[_]): Unit = {
+        vertex.property(Cardinality.single, key, p.value)
+      }
+    })
+  }
+
+  def fillPropsWithTs(vertex: S2Vertex, state: State): Unit = {
+    state.foreach { case (k, v) => vertex.property(Cardinality.single, k.name, v.value) }
+  }
+
+  def propsToState(props: Props): State = {
+    props.asScala.map { case (k, v) =>
+      v.columnMeta -> v.innerVal
+    }.toMap
+  }
+
+  def stateToProps(vertex: S2Vertex, state: State): Props = {
+    state.foreach { case (k, v) =>
+      vertex.property(k.name, v.value)
+    }
+    vertex.props
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala
index 0f9f87b..9f8c682 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala
@@ -25,23 +25,44 @@ import org.apache.s2graph.core.mysqls.ColumnMeta
 import org.apache.s2graph.core.types.CanInnerValLike
 import org.apache.tinkerpop.gremlin.structure.{Property, VertexProperty, Vertex => TpVertex}
 
-case class S2VertexProperty[V](element: TpVertex,
+import scala.util.hashing.MurmurHash3
+
+case class S2VertexProperty[V](element: S2Vertex,
                                columnMeta: ColumnMeta,
                                key: String,
-                               value: V) extends VertexProperty[V] {
-  implicit val encodingVer = columnMeta.serviceColumn.schemaVersion
-  val innerVal = CanInnerValLike.anyToInnerValLike.toInnerVal(value)
+                               v: V) extends VertexProperty[V] {
+  import CanInnerValLike._
+  implicit lazy val encodingVer = element.serviceColumn.schemaVersion
+  lazy val innerVal = CanInnerValLike.anyToInnerValLike.toInnerVal(value)
   def toBytes: Array[Byte] = {
     innerVal.bytes
   }
 
+  val value = castValue(v, columnMeta.dataType).asInstanceOf[V]
+
   override def properties[U](strings: String*): util.Iterator[Property[U]] = ???
 
-  override def property[V](s: String, v: V): Property[V] = ???
+  override def property[V](key: String, value: V): Property[V] = ???
 
   override def remove(): Unit = ???
 
   override def id(): AnyRef = ???
 
   override def isPresent: Boolean = ???
+
+  override def hashCode(): Int = {
+    MurmurHash3.stringHash(columnMeta.columnId + "," + columnMeta.id.get + "," + key + "," + value)
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case p: S2VertexProperty[_] =>
+      columnMeta.columnId == p.columnMeta.columnId &&
+        columnMeta.seq == p.columnMeta.seq &&
+        key == p.key && value == p.value
+    case _ => false
+  }
+
+  override def toString(): String = {
+    Map("columnMeta" -> columnMeta.toString, "key" -> key, "value" -> value).toString
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
deleted file mode 100644
index 57c9824..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core
-
-import java.util
-
-import org.apache.s2graph.core.JSONParser._
-import org.apache.s2graph.core.mysqls.{ColumnMeta, Service, ServiceColumn}
-import org.apache.s2graph.core.types.{InnerVal, InnerValLike, SourceVertexId, VertexId}
-import org.apache.tinkerpop.gremlin.structure
-import org.apache.tinkerpop.gremlin.structure.VertexProperty.Cardinality
-import org.apache.tinkerpop.gremlin.structure.{Vertex => TpVertex, Direction, Edge, VertexProperty, Graph}
-import play.api.libs.json.Json
-
-case class Vertex(graph: Graph,
-                  id: VertexId,
-                  ts: Long = System.currentTimeMillis(),
-                  props: Map[Int, InnerValLike] = Map.empty[Int, InnerValLike],
-                  op: Byte = 0,
-                  belongLabelIds: Seq[Int] = Seq.empty) extends GraphElement with TpVertex {
-
-  val innerId = id.innerId
-
-  val innerIdVal = innerId.value
-
-  lazy val properties = for {
-    (k, v) <- props
-    meta <- serviceColumn.metasMap.get(k)
-  } yield meta.name -> v.value
-
-  def schemaVer = serviceColumn.schemaVersion
-
-  def serviceColumn = ServiceColumn.findById(id.colId)
-
-  def columnName = serviceColumn.columnName
-
-  def service = Service.findById(serviceColumn.serviceId)
-
-  lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, service.hTableName)
-
-  def defaultProps = Map(ColumnMeta.lastModifiedAtColumnSeq.toInt -> InnerVal.withLong(ts, schemaVer))
-
-  //  lazy val kvs = Graph.client.vertexSerializer(this).toKeyValues
-
-  /** TODO: make this as configurable */
-  override def serviceName = service.serviceName
-
-  override def isAsync = false
-
-  override def queueKey = Seq(ts.toString, serviceName).mkString("|")
-
-  override def queuePartitionKey = id.innerId.toString
-
-  def propsWithName = for {
-    (seq, v) <- props
-    meta <- ColumnMeta.findByIdAndSeq(id.colId, seq.toByte)
-  } yield (meta.name -> v.toString)
-
-  def toEdgeVertex() = graph.newVertex(SourceVertexId(id.column, innerId), ts, props, op)
-
-
-  override def hashCode() = {
-    val hash = id.hashCode()
-    //    logger.debug(s"Vertex.hashCode: $this -> $hash")
-    hash
-  }
-
-  override def equals(obj: Any) = {
-    obj match {
-      case otherVertex: Vertex =>
-        val ret = id == otherVertex.id
-        //        logger.debug(s"Vertex.equals: $this, $obj => $ret")
-        ret
-      case _ => false
-    }
-  }
-
-  def withProps(newProps: Map[Int, InnerValLike]) = graph.newVertex(id, ts, newProps, op)
-
-  def toLogString(): String = {
-    val (serviceName, columnName) =
-      if (!id.storeColId) ("", "")
-      else (serviceColumn.service.serviceName, serviceColumn.columnName)
-
-    if (propsWithName.nonEmpty)
-      Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, columnName, Json.toJson(propsWithName)).mkString("\t")
-    else
-      Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, columnName).mkString("\t")
-  }
-
-  override def vertices(direction: Direction, strings: String*): util.Iterator[TpVertex] = ???
-
-  override def edges(direction: Direction, strings: String*): util.Iterator[structure.Edge] = ???
-
-  override def property[V](cardinality: Cardinality, s: String, v: V, objects: AnyRef*): VertexProperty[V] = ???
-
-  override def addEdge(s: String, vertex: TpVertex, objects: AnyRef*): Edge = ???
-
-  override def properties[V](strings: String*): util.Iterator[VertexProperty[V]] = ???
-
-  override def remove(): Unit = ???
-
-  override def label(): String = ???
-}
-
-object Vertex {
-
-  def toPropKey(labelId: Int): Int = Byte.MaxValue + labelId
-
-  def toLabelId(propKey: Int): Int = propKey - Byte.MaxValue
-
-  def isLabelId(propKey: Int): Boolean = propKey > Byte.MaxValue
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
index f6c174d..09d02d1 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
@@ -30,6 +30,8 @@ object ColumnMeta extends Model[ColumnMeta] {
   val lastModifiedAtColumn = ColumnMeta(Some(0), 0, "lastModifiedAt", lastModifiedAtColumnSeq, "long")
   val maxValue = Byte.MaxValue
 
+  val timestamp = ColumnMeta(None, -1, "_timestamp", timeStampSeq, "long")
+
   def apply(rs: WrappedResultSet): ColumnMeta = {
     ColumnMeta(Some(rs.int("id")), rs.int("column_id"), rs.string("name"), rs.byte("seq"), rs.string("data_type").toLowerCase())
   }
@@ -125,6 +127,14 @@ object ColumnMeta extends Model[ColumnMeta] {
 }
 
 case class ColumnMeta(id: Option[Int], columnId: Int, name: String, seq: Byte, dataType: String) {
-  lazy val serviceColumn = ServiceColumn.findById(columnId)
   lazy val toJson = Json.obj("name" -> name, "dataType" -> dataType)
+  override def equals(other: Any): Boolean = {
+    if (!other.isInstanceOf[ColumnMeta]) false
+    else {
+      val o = other.asInstanceOf[ColumnMeta]
+      //      labelId == o.labelId &&
+      seq == o.seq
+    }
+  }
+  override def hashCode(): Int = seq.toInt
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
index 6636649..4a7e931 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
@@ -191,7 +191,6 @@ case class LabelMeta(id: Option[Int],
                      seq: Byte,
                      defaultValue: String,
                      dataType: String) {
-  lazy val label = Label.findById(labelId)
   lazy val toJson = Json.obj("name" -> name, "defaultValue" -> defaultValue, "dataType" -> dataType)
   override def equals(other: Any): Boolean = {
     if (!other.isInstanceOf[LabelMeta]) false

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
index 85b6929..ebbbf88 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
@@ -29,12 +29,19 @@ import org.apache.s2graph.core.types.{HBaseType, InnerValLikeWithTs, InnerValLik
 import play.api.libs.json.Json
 import scalikejdbc._
 object ServiceColumn extends Model[ServiceColumn] {
-  val Default = ServiceColumn(Option(HBaseType.DEFAULT_COL_ID), 0, "default", "string", "v4")
+  val Default = ServiceColumn(Option(0), -1, "default", "string", "v4")
 
   def apply(rs: WrappedResultSet): ServiceColumn = {
     ServiceColumn(rs.intOpt("id"), rs.int("service_id"), rs.string("column_name"), rs.string("column_type").toLowerCase(), rs.string("schema_version"))
   }
 
+//  def findByServiceAndColumn(serviceName: String,
+//                             columnName: String,
+//                             useCache: Boolean  = true)(implicit session: DBSession): Option[ServiceColumn] = {
+//    val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found."))
+//    find(service.id.get, columnName, useCache)
+//  }
+
   def findById(id: Int)(implicit session: DBSession = AutoSession): ServiceColumn = {
 //    val cacheKey = s"id=$id"
     val cacheKey = "id=" + id
@@ -95,18 +102,18 @@ object ServiceColumn extends Model[ServiceColumn] {
 case class ServiceColumn(id: Option[Int], serviceId: Int, columnName: String, columnType: String, schemaVersion: String)  {
 
   lazy val service = Service.findById(serviceId)
-  lazy val metas = ColumnMeta.findAllByColumn(id.get)
+  lazy val metas = ColumnMeta.timestamp +: ColumnMeta.findAllByColumn(id.get) :+ ColumnMeta.lastModifiedAtColumn
   lazy val metasMap = metas.map { meta => meta.seq.toInt -> meta } toMap
   lazy val metasInvMap = metas.map { meta => meta.name -> meta} toMap
   lazy val metaNamesMap = (ColumnMeta.lastModifiedAtColumn :: metas).map(x => (x.seq.toInt, x.name)) toMap
   lazy val toJson = Json.obj("serviceName" -> service.serviceName, "columnName" -> columnName, "columnType" -> columnType)
 
-  def propsToInnerVals(props: Map[String, Any]): Map[Int, InnerValLike] = {
+  def propsToInnerVals(props: Map[String, Any]): Map[ColumnMeta, InnerValLike] = {
     for {
       (k, v) <- props
       labelMeta <- metasInvMap.get(k)
       innerVal = toInnerVal(v.toString, labelMeta.dataType, schemaVersion)
-    } yield labelMeta.seq.toInt -> innerVal
+    } yield labelMeta -> innerVal
   }
 
   def innerValsToProps(props: Map[Int, InnerValLike]): Map[String, Any] = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
index aa018a9..d754bb7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
@@ -22,7 +22,7 @@ package org.apache.s2graph.core.parsers
 import org.apache.s2graph.core.GraphExceptions.{LabelNotExistException, WhereParserException}
 import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
 import org.apache.s2graph.core.types.InnerValLike
-import org.apache.s2graph.core.{Edge, GraphUtil}
+import org.apache.s2graph.core.{S2Edge, GraphUtil}
 import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core.utils.logger
 
@@ -33,7 +33,7 @@ import scala.util.parsing.combinator.JavaTokenParsers
 trait ExtractValue {
   val parent = "_parent."
 
-  def propToInnerVal(edge: Edge, key: String) = {
+  def propToInnerVal(edge: S2Edge, key: String) = {
     val (propKey, parentEdge) = findParentEdge(edge, key)
 
     val label = parentEdge.innerLabel
@@ -47,7 +47,7 @@ trait ExtractValue {
     }
   }
 
-  def valueToCompare(edge: Edge, key: String, value: String) = {
+  def valueToCompare(edge: S2Edge, key: String, value: String) = {
     val label = edge.innerLabel
     if (value.startsWith(parent) || label.metaPropsInvMap.contains(value)) propToInnerVal(edge, value)
     else {
@@ -65,11 +65,11 @@ trait ExtractValue {
   }
 
   @tailrec
-  private def findParent(edge: Edge, depth: Int): Edge =
+  private def findParent(edge: S2Edge, depth: Int): S2Edge =
     if (depth > 0) findParent(edge.parentEdges.head.edge, depth - 1)
     else edge
 
-  private def findParentEdge(edge: Edge, key: String): (String, Edge) = {
+  private def findParentEdge(edge: S2Edge, key: String): (String, S2Edge) = {
     if (!key.startsWith(parent)) (key, edge)
     else {
       val split = key.split(parent)
@@ -88,9 +88,9 @@ trait Clause extends ExtractValue {
 
   def or(otherField: Clause): Clause = Or(this, otherField)
 
-  def filter(edge: Edge): Boolean
+  def filter(edge: S2Edge): Boolean
 
-  def binaryOp(binOp: (InnerValLike, InnerValLike) => Boolean)(propKey: String, value: String)(edge: Edge): Boolean = {
+  def binaryOp(binOp: (InnerValLike, InnerValLike) => Boolean)(propKey: String, value: String)(edge: S2Edge): Boolean = {
     val propValue = propToInnerVal(edge, propKey)
     val compValue = valueToCompare(edge, propKey, value)
 
@@ -105,20 +105,20 @@ object Where {
   }
 }
 case class Where(clauses: Seq[Clause] = Seq.empty[Clause]) {
-  def filter(edge: Edge) =
+  def filter(edge: S2Edge) =
     if (clauses.isEmpty) true else clauses.map(_.filter(edge)).forall(identity)
 }
 
 case class Gt(propKey: String, value: String) extends Clause {
-  override def filter(edge: Edge): Boolean = binaryOp(_ > _)(propKey, value)(edge)
+  override def filter(edge: S2Edge): Boolean = binaryOp(_ > _)(propKey, value)(edge)
 }
 
 case class Lt(propKey: String, value: String) extends Clause {
-  override def filter(edge: Edge): Boolean = binaryOp(_ < _)(propKey, value)(edge)
+  override def filter(edge: S2Edge): Boolean = binaryOp(_ < _)(propKey, value)(edge)
 }
 
 case class Eq(propKey: String, value: String) extends Clause {
-  override def filter(edge: Edge): Boolean = binaryOp(_ == _)(propKey, value)(edge)
+  override def filter(edge: S2Edge): Boolean = binaryOp(_ == _)(propKey, value)(edge)
 }
 
 case class InWithoutParent(label: Label, propKey: String, values: Set[String]) extends Clause {
@@ -144,7 +144,7 @@ case class InWithoutParent(label: Label, propKey: String, values: Set[String]) e
     toInnerVal(value, dataType, label.schemaVersion)
   }
 
-  override def filter(edge: Edge): Boolean = {
+  override def filter(edge: S2Edge): Boolean = {
     if (edge.dir == GraphUtil.directions("in")) {
       val propVal = propToInnerVal(edge, propKey)
       innerValLikeLsIn.contains(propVal)
@@ -156,7 +156,7 @@ case class InWithoutParent(label: Label, propKey: String, values: Set[String]) e
 }
 
 case class IN(propKey: String, values: Set[String]) extends Clause {
-  override def filter(edge: Edge): Boolean = {
+  override def filter(edge: S2Edge): Boolean = {
     val propVal = propToInnerVal(edge, propKey)
     values.exists { value =>
       valueToCompare(edge, propKey, value) == propVal
@@ -165,7 +165,7 @@ case class IN(propKey: String, values: Set[String]) extends Clause {
 }
 
 case class Between(propKey: String, minValue: String, maxValue: String) extends Clause {
-  override def filter(edge: Edge): Boolean = {
+  override def filter(edge: S2Edge): Boolean = {
     val propVal = propToInnerVal(edge, propKey)
     val minVal = valueToCompare(edge, propKey, minValue)
     val maxVal = valueToCompare(edge, propKey, maxValue)
@@ -175,15 +175,15 @@ case class Between(propKey: String, minValue: String, maxValue: String) extends
 }
 
 case class Not(self: Clause) extends Clause {
-  override def filter(edge: Edge) = !self.filter(edge)
+  override def filter(edge: S2Edge) = !self.filter(edge)
 }
 
 case class And(left: Clause, right: Clause) extends Clause {
-  override def filter(edge: Edge) = left.filter(edge) && right.filter(edge)
+  override def filter(edge: S2Edge) = left.filter(edge) && right.filter(edge)
 }
 
 case class Or(left: Clause, right: Clause) extends Clause {
-  override def filter(edge: Edge) = left.filter(edge) || right.filter(edge)
+  override def filter(edge: S2Edge) = left.filter(edge) || right.filter(edge)
 }
 
 object WhereParser {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index 13e02a0..62d1e40 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -118,7 +118,7 @@ object RequestParser {
 
 }
 
-class RequestParser(graph: Graph) {
+class RequestParser(graph: S2Graph) {
 
   import Management.JsonModel._
   import RequestParser._
@@ -261,7 +261,7 @@ class RequestParser(graph: Graph) {
       GroupBy(keys)
   }.getOrElse(GroupBy.Empty)
 
-  def toVertices(labelName: String, direction: String, ids: Seq[JsValue]): Seq[Vertex] = {
+  def toVertices(labelName: String, direction: String, ids: Seq[JsValue]): Seq[S2Vertex] = {
     val vertices = for {
       label <- Label.findByName(labelName).toSeq
       serviceColumn = if (direction == "out") label.srcColumn else label.tgtColumn
@@ -547,12 +547,12 @@ class RequestParser(graph: Graph) {
     elementsWithTsv
   }
 
-  def parseJsonFormat(jsValue: JsValue, operation: String): Seq[(Edge, String)] = {
+  def parseJsonFormat(jsValue: JsValue, operation: String): Seq[(S2Edge, String)] = {
     val jsValues = toJsValues(jsValue)
     jsValues.flatMap(toEdgeWithTsv(_, operation))
   }
 
-  private def toEdgeWithTsv(jsValue: JsValue, operation: String): Seq[(Edge, String)] = {
+  private def toEdgeWithTsv(jsValue: JsValue, operation: String): Seq[(S2Edge, String)] = {
     val srcIds = (jsValue \ "from").asOpt[JsValue].map(Seq(_)).getOrElse(Nil) ++ (jsValue \ "froms").asOpt[Seq[JsValue]].getOrElse(Nil)
     val tgtIds = (jsValue \ "to").asOpt[JsValue].map(Seq(_)).getOrElse(Nil) ++ (jsValue \ "tos").asOpt[Seq[JsValue]].getOrElse(Nil)
 
@@ -580,7 +580,7 @@ class RequestParser(graph: Graph) {
     toJsValues(jsValue).map(toVertex(_, operation, serviceName, columnName))
   }
 
-  def toVertex(jsValue: JsValue, operation: String, serviceName: Option[String] = None, columnName: Option[String] = None): Vertex = {
+  def toVertex(jsValue: JsValue, operation: String, serviceName: Option[String] = None, columnName: Option[String] = None): S2Vertex = {
     val id = parse[JsValue](jsValue, "id")
     val ts = parseOption[Long](jsValue, "timestamp").getOrElse(System.currentTimeMillis())
     val sName = if (serviceName.isEmpty) parse[String](jsValue, "serviceName") else serviceName.get