You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2017/11/19 02:28:52 UTC

[09/23] incubator-s2graph git commit: apply MutationHelper and GraphElementBuilder.

apply MutationHelper and GraphElementBuilder.


Branch: refs/heads/master
Commit: aa66822b08d0045e3870af2a9b82523947f553ce
Parents: 937b55a
Author: DO YUNG YOON <>
Authored: Fri Nov 3 21:41:57 2017 +0900
Committer: DO YUNG YOON <>
Committed: Sat Nov 4 07:01:07 2017 +0900

 .../loader/subscriber/GraphSubscriber.scala     |   2 +-
 .../loader/subscriber/TransferToHFile.scala     |  10 +-
 .../s2graph/loader/subscriber/WalLogStat.scala  |   3 +-
 .../loader/subscriber/WalLogToHDFS.scala        |   3 +-
 .../scala/org/apache/s2graph/core/S2Graph.scala | 718 ++-----------------
 .../org/apache/s2graph/core/S2GraphLike.scala   | 238 ++++++
 .../s2graph/core/features/S2Features.scala      |  19 +
 .../apache/s2graph/core/storage/Storage.scala   | 112 +--
 .../hbase/AsynchbaseStorageReadable.scala       |   2 +-
 .../tall/SnapshotEdgeDeserializable.scala       |   2 +-
 .../wide/SnapshotEdgeDeserializable.scala       |   2 +-
 .../core/storage/hbase/IndexEdgeTest.scala      |   4 +-
 12 files changed, 386 insertions(+), 729 deletions(-)
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
index 2352cdf..a371b6b 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
@@ -106,7 +106,7 @@ object GraphSubscriberHelper extends WithKafka {
                      (statFunc: (String, Int) => Unit): Iterable[GraphElement] = {
     (for (msg <- msgs) yield {
       statFunc("total", 1)
-      g.toGraphElement(msg, labelMapping) match {
+      g.elementBuilder.toGraphElement(msg, labelMapping) match {
         case Some(e) if e.isInstanceOf[S2Edge] =>
           statFunc("EdgeParseOk", 1)
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
index 618c1bd..a7b4e00 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
@@ -84,11 +84,11 @@ object TransferToHFile extends SparkApp {
     } yield output
   def buildPutRequests(snapshotEdge: SnapshotEdge): List[PutRequest] = {
-    val kvs = GraphSubscriberHelper.g.getStorage(snapshotEdge.label).snapshotEdgeSerializer(snapshotEdge).toKeyValues.toList
+    val kvs = GraphSubscriberHelper.g.getStorage(snapshotEdge.label).serDe.snapshotEdgeSerializer(snapshotEdge).toKeyValues.toList { kv => new PutRequest(kv.table, kv.row,, kv.qualifier, kv.value, kv.timestamp) }
   def buildPutRequests(indexEdge: IndexEdge): List[PutRequest] = {
-    val kvs = GraphSubscriberHelper.g.getStorage(indexEdge.label).indexEdgeSerializer(indexEdge).toKeyValues.toList
+    val kvs = GraphSubscriberHelper.g.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.toList { kv => new PutRequest(kv.table, kv.row,, kv.qualifier, kv.value, kv.timestamp) }
   def buildDegreePutRequests(vertexId: String, labelName: String, direction: String, degreeVal: Long): List[PutRequest] = {
@@ -104,7 +104,7 @@ object TransferToHFile extends SparkApp {
     val edge = GraphSubscriberHelper.g.newEdge(vertex, vertex, label, dir, propsWithTs=propsWithTs)
     edge.edgesWithIndex.flatMap { indexEdge =>
-      GraphSubscriberHelper.g.getStorage(indexEdge.label).indexEdgeSerializer(indexEdge) { kv =>
+      GraphSubscriberHelper.g.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge) { kv =>
         new PutRequest(kv.table, kv.row,, Array.empty[Byte], Bytes.toBytes(degreeVal), kv.timestamp)
@@ -125,7 +125,7 @@ object TransferToHFile extends SparkApp {
   def toKeyValues(strs: Seq[String], labelMapping: Map[String, String], autoEdgeCreate: Boolean): Iterator[KeyValue] = {
     val kvList = new java.util.ArrayList[KeyValue]
     for (s <- strs) {
-      val elementList = GraphSubscriberHelper.g.toGraphElement(s, labelMapping).toSeq
+      val elementList = GraphSubscriberHelper.g.elementBuilder.toGraphElement(s, labelMapping).toSeq
       for (element <- elementList) {
         if (element.isInstanceOf[S2Edge]) {
           val edge = element.asInstanceOf[S2Edge]
@@ -136,7 +136,7 @@ object TransferToHFile extends SparkApp {
         } else if (element.isInstanceOf[S2Vertex]) {
           val vertex = element.asInstanceOf[S2Vertex]
-          val putRequestList = GraphSubscriberHelper.g.getStorage(vertex.service).vertexSerializer(vertex) { kv =>
+          val putRequestList = GraphSubscriberHelper.g.getStorage(vertex.service).serDe.vertexSerializer(vertex) { kv =>
             new PutRequest(kv.table, kv.row,, kv.qualifier, kv.value, kv.timestamp)
           for (p <- putRequestList) {
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
index 40f936d..eca77f9 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
@@ -21,7 +21,6 @@ package org.apache.s2graph.loader.subscriber
 import kafka.producer.KeyedMessage
 import kafka.serializer.StringDecoder
-import org.apache.s2graph.core.S2Graph$
 import org.apache.s2graph.spark.spark.{WithKafka, SparkApp}
 import org.apache.spark.streaming.Durations._
 import org.apache.spark.streaming.kafka.HasOffsetRanges
@@ -69,7 +68,7 @@ object WalLogStat extends SparkApp with WithKafka {
         val phase = System.getProperty("phase")
         GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList) { case (key, msg) =>
-          GraphSubscriberHelper.g.toGraphElement(msg) match {
+          GraphSubscriberHelper.g.elementBuilder.toGraphElement(msg) match {
             case Some(elem) =>
               val serviceName = elem.serviceName
               msg.split("\t", 7) match {
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
index 23c3cda..605a994 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
@@ -22,7 +22,6 @@ package org.apache.s2graph.loader.subscriber
 import java.text.SimpleDateFormat
 import java.util.Date
 import kafka.serializer.StringDecoder
-import org.apache.s2graph.core.S2Graph$
 import org.apache.s2graph.spark.spark.{WithKafka, SparkApp, HashMapParam}
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.streaming.Durations._
@@ -92,7 +91,7 @@ object WalLogToHDFS extends SparkApp with WithKafka {
         GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList)
         partition.flatMap { case (key, msg) =>
-          val optMsg = GraphSubscriberHelper.g.toGraphElement(msg).flatMap { element =>
+          val optMsg = GraphSubscriberHelper.g.elementBuilder.toGraphElement(msg).flatMap { element =>
             val arr = msg.split("\t", 7)
             val service = element.serviceName
             val label = arr(5)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 82d0c6a..5e23f9b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -483,11 +483,11 @@ object S2Graph {
   new Graph.OptOut(test="", method="*", reason="no")
   // all failed.
-class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph {
+class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2GraphLike {
   import S2Graph._
-  private var apacheConfiguration: Configuration = _
+  var apacheConfiguration: Configuration = _
   def dbSession() = scalikejdbc.AutoSession
@@ -575,6 +575,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
   val indexProvider = IndexProvider.apply(config)
+  val elementBuilder = new GraphElementBuilder(this)
   def getStorage(service: Service): Storage = {
     storagePool.getOrElse(s"service:${service.serviceName}", defaultStorage)
@@ -639,8 +641,6 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
     Try {
       if (q.steps.isEmpty) fallback
       else {
-        val queryOption = q.queryOption
         def fetch: Future[StepResult] = {
           val startStepInnerResult = QueryResult.fromVertices(this, q)
           q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) =>
@@ -795,7 +795,6 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
       Query(vertices, Vector(step))
-    //    Extensions.retryOnSuccessWithBackoff(MaxRetryNum, Random.nextInt(MaxBackOff) + 1) {
     val retryFuture = Extensions.retryOnSuccess(DeleteAllFetchCount) {
       fetchAndDeleteAll(queries, requestTs)
     } { case (allDeleted, deleteSuccess) =>
@@ -832,16 +831,19 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
   def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepResult],
                               requestTs: Long): Future[(Boolean, Boolean)] = {
     stepInnerResultLs.foreach { stepInnerResult =>
-      logger.error(s"[!!!!!!]: ${stepInnerResult.edgeWithScores.size}")
       if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.")
     val futures = for {
       stepInnerResult <- stepInnerResultLs
-      deleteStepInnerResult = buildEdgesToDelete(stepInnerResult, requestTs)
-      if deleteStepInnerResult.edgeWithScores.nonEmpty
+      filtered = stepInnerResult.edgeWithScores.filter { edgeWithScore =>
+        (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree
+      }
+      edgesToDelete = elementBuilder.buildEdgesToDelete(filtered, requestTs)
+      if edgesToDelete.nonEmpty
     } yield {
-      val head = deleteStepInnerResult.edgeWithScores.head
+      val head = edgesToDelete.head
       val label = head.edge.innerLabel
+      val stepResult = StepResult(edgesToDelete, Nil, Nil, false)
       val ret = label.schemaVersion match {
         case HBaseType.VERSION3 | HBaseType.VERSION4 =>
           if (label.consistencyLevel == "strong") {
@@ -849,9 +851,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
               * read: snapshotEdge on queryResult = O(N)
               * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge))
-            mutateEdges(, withWait = true).map(_.forall(_.isSuccess))
+            mutateEdges(, withWait = true).map(_.forall(_.isSuccess))
           } else {
-            deleteAllFetchedEdgesAsyncOld(getStorage(label))(deleteStepInnerResult, requestTs, MaxRetryNum)
+            getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
         case _ =>
@@ -859,7 +861,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
             * read: x
             * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices)
-          deleteAllFetchedEdgesAsyncOld(getStorage(label))(deleteStepInnerResult, requestTs, MaxRetryNum)
+          getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
@@ -872,71 +874,6 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
-  private def deleteAllFetchedEdgesAsyncOld(storage: Storage)(stepInnerResult: StepResult,
-                                                         requestTs: Long,
-                                                         retryNum: Int): Future[Boolean] = {
-    if (stepInnerResult.isEmpty) Future.successful(true)
-    else {
-      val head = stepInnerResult.edgeWithScores.head
-      val zkQuorum = head.edge.innerLabel.hbaseZkAddr
-      val futures = for {
-        edgeWithScore <- stepInnerResult.edgeWithScores
-      } yield {
-        val edge = edgeWithScore.edge
-        val score = edgeWithScore.score
-        val edgeSnapshot = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
-        val reversedSnapshotEdgeMutations = storage.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge) = SKeyValue.Put))
-        val edgeForward = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
-        val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge =>
-          storage.indexEdgeSerializer(indexEdge) = SKeyValue.Delete)) ++
-            storage.buildIncrementsAsync(indexEdge, -1L)
-        }
-        /* reverted direction */
-        val edgeRevert = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
-        val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
-          storage.indexEdgeSerializer(indexEdge) = SKeyValue.Delete)) ++
-            storage.buildIncrementsAsync(indexEdge, -1L)
-        }
-        val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
-        storage.writeToStorage(zkQuorum, mutations, withWait = true)
-      }
-      Future.sequence(futures).map { rets => rets.forall(_.isSuccess) }
-    }
-  }
-  def buildEdgesToDelete(stepInnerResult: StepResult, requestTs: Long): StepResult = {
-    val filtered = stepInnerResult.edgeWithScores.filter { edgeWithScore =>
-      (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree
-    }
-    if (filtered.isEmpty) StepResult.Empty
-    else {
-      val head = filtered.head
-      val label = head.edge.innerLabel
-      val edgeWithScoreLs = { edgeWithScore =>
-          val edge = edgeWithScore.edge
-          val copiedEdge = label.consistencyLevel match {
-            case "strong" =>
-              edge.copyEdge(op = GraphUtil.operations("delete"),
-                version = requestTs, propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs)
-            case _ =>
-              edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs)
-          }
-        val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
-        //      logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}")
-        edgeToDelete
-      }
-      //Degree edge?
-      StepResult(edgeWithScores = edgeWithScoreLs, grouped = Nil, degreeEdges = Nil, false)
-    }
-  }
   def mutateElements(elements: Seq[GraphElement],
                      withWait: Boolean = false): Future[Seq[MutateResponse]] = {
@@ -967,8 +904,6 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
-  //  def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait)
   def mutateEdges(edges: Seq[S2EdgeLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = {
     val edgeWithIdxs = edges.zipWithIndex
@@ -985,20 +920,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
         val idxs =
         /* multiple edges with weak consistency level will be processed as batch */
-        val mutations = edges.flatMap { edge =>
-          val (_, edgeUpdate) =
-            if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
-            else S2Edge.buildOperation(None, Seq(edge))
-          val (bufferIncr, nonBufferIncr) = storage.increments(edgeUpdate.deepCopy)
-          if (bufferIncr.nonEmpty) storage.writeToStorage(zkQuorum, bufferIncr, withWait = false)
-          storage.buildVertexPutsAsync(edge) ++ storage.indexedEdgeMutations(edgeUpdate.deepCopy) ++ storage.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
-        }
-        storage.writeToStorage(zkQuorum, mutations, withWait).map { ret =>
- => idx -> ret.isSuccess)
-        }
+        storage.mutateWeakEdges(zkQuorum, edges, withWait)
@@ -1012,7 +934,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
       val edges =
       val idxs =
       val storage = getStorage(label)
-      mutateStrongEdges(storage)(edges, withWait = true).map { rets =>
+      val zkQuorum = label.hbaseZkAddr
+      storage.mutateStrongEdges(zkQuorum, edges, withWait = true).map { rets =>
@@ -1026,119 +949,24 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
-  private def mutateStrongEdges(storage: Storage)(_edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[Boolean]] = {
-    val edgeWithIdxs = _edges.zipWithIndex
-    val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
-      (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId)
-    } toSeq
-    val mutateEdges = { case ((_, _, _), edgeGroup) =>
-      val edges =
-      val idxs =
-      // After deleteAll, process others
-      val mutateEdgeFutures = edges.toList match {
-        case head :: tail =>
-          val edgeFuture = mutateEdgesInner(storage)(edges, checkConsistency = true , withWait)
-          //TODO: decide what we will do on failure on vertex put
-          val puts = storage.buildVertexPutsAsync(head)
-          val vertexFuture = storage.writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait)
-          Seq(edgeFuture, vertexFuture)
-        case Nil => Nil
-      }
-      val composed = for {
-      //        deleteRet <- Future.sequence(deleteAllFutures)
-        mutateRet <- Future.sequence(mutateEdgeFutures)
-      } yield mutateRet
- { ret => idx => idx -> ret) }
-    }
-    Future.sequence(mutateEdges).map { squashedRets =>
-      squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2)
-    }
-  }
-  private def mutateEdgesInner(storage: Storage)(edges: Seq[S2EdgeLike],
-                       checkConsistency: Boolean,
-                       withWait: Boolean): Future[MutateResponse] = {
-    assert(edges.nonEmpty)
-    // TODO:: remove after code review: unreachable code
-    if (!checkConsistency) {
-      val zkQuorum = edges.head.innerLabel.hbaseZkAddr
-      val futures = { edge =>
-        val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))
-        val (bufferIncr, nonBufferIncr) = storage.increments(edgeUpdate.deepCopy)
-        val mutations =
-          storage.indexedEdgeMutations(edgeUpdate.deepCopy) ++ storage.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
-        if (bufferIncr.nonEmpty) storage.writeToStorage(zkQuorum, bufferIncr, withWait = false)
-        storage.writeToStorage(zkQuorum, mutations, withWait)
-      }
-      Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) }
-    } else {
-      storage.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
-        storage.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_))
-      }
-    }
-  }
   def mutateVertices(vertices: Seq[S2VertexLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = {
-    def mutateVertex(storage: Storage)(vertex: S2VertexLike, withWait: Boolean): Future[MutateResponse] = {
-      if (vertex.op == GraphUtil.operations("delete")) {
-        storage.writeToStorage(vertex.hbaseZkAddr,
-          storage.vertexSerializer(vertex) = SKeyValue.Delete)), withWait)
-      } else if (vertex.op == GraphUtil.operations("deleteAll")) {
-"deleteAll for vertex is truncated. $vertex")
-        Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time
-      } else {
-        storage.writeToStorage(vertex.hbaseZkAddr, storage.buildPutsAll(vertex), withWait)
-      }
-    }
-    def mutateVertices(storage: Storage)(vertices: Seq[S2VertexLike],
-                       withWait: Boolean = false): Future[Seq[MutateResponse]] = {
-      val futures = { vertex => mutateVertex(storage)(vertex, withWait) }
+    def mutateVertices(storage: Storage)(zkQuorum: String, vertices: Seq[S2VertexLike],
+                                         withWait: Boolean = false): Future[Seq[MutateResponse]] = {
+      val futures = { vertex => storage.mutateVertex(zkQuorum, vertex, withWait) }
     val verticesWithIdx = vertices.zipWithIndex
     val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
-      mutateVertices(getStorage(service))(, withWait).map(
+      mutateVertices(getStorage(service))(service.cluster,, withWait).map(
     Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) }
   def incrementCounts(edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[MutateResponse]] = {
-    def incrementCounts(storage: Storage)(edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[MutateResponse]] = {
-      val futures = for {
-        edge <- edges
-      } yield {
-        val kvs = for {
-          relEdge <- edge.relatedEdges
-          edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid)
-        } yield {
-          val countWithTs = edge.propertyValueInner(LabelMeta.count)
-          val countVal = countWithTs.innerVal.toString().toLong
-          storage.buildIncrementsCountAsync(edgeWithIndex, countVal).head
-        }
-        storage.writeToStorage(edge.innerLabel.hbaseZkAddr, kvs, withWait = withWait)
-      }
-      Future.sequence(futures)
-    }
     val edgesWithIdx = edges.zipWithIndex
     val futures = edgesWithIdx.groupBy { case (e, idx) => e.innerLabel }.map { case (label, edgeGroup) =>
-      incrementCounts(getStorage(label))(, withWait).map(
+      getStorage(label).incrementCounts(label.hbaseZkAddr,, withWait).map(
     Future.sequence(futures).map { ls =>
@@ -1147,11 +975,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
   def updateDegree(edge: S2EdgeLike, degreeVal: Long = 0): Future[MutateResponse] = {
     val label = edge.innerLabel
     val storage = getStorage(label)
-    val kvs = storage.buildDegreePuts(edge, degreeVal)
-    storage.writeToStorage(edge.innerLabel.service.cluster, kvs, withWait = true)
+    storage.updateDegree(label.hbaseZkAddr, edge, degreeVal)
   def isRunning(): Boolean = running.get()
@@ -1164,166 +990,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
-  def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try {
-    val parts = GraphUtil.split(s)
-    val logType = parts(2)
-    val element = if (logType == "edge" | logType == "e") {
-      /* current only edge is considered to be bulk loaded */
-      labelMapping.get(parts(5)) match {
-        case None =>
-        case Some(toReplace) =>
-          parts(5) = toReplace
-      }
-      toEdge(parts)
-    } else if (logType == "vertex" | logType == "v") {
-      toVertex(parts)
-    } else {
-      throw new GraphExceptions.JsonParseException("log type is not exist in log.")
-    }
-    element
-  } recover {
-    case e: Exception =>
-      logger.error(s"[toElement]: $e", e)
-      None
-  } get
-  def toVertex(s: String): Option[S2VertexLike] = {
-    toVertex(GraphUtil.split(s))
-  }
-  def toEdge(s: String): Option[S2EdgeLike] = {
-    toEdge(GraphUtil.split(s))
-  }
-  def toEdge(parts: Array[String]): Option[S2EdgeLike] = Try {
-    val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
-    val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
-    val tempDirection = if (parts.length >= 8) parts(7) else "out"
-    val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection
-    val edge = toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation)
-    Option(edge)
-  } recover {
-    case e: Exception =>
-      logger.error(s"[toEdge]: $e", e)
-      throw e
-  } get
-  def toVertex(parts: Array[String]): Option[S2VertexLike] = Try {
-    val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
-    val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
-    val vertex = toVertex(serviceName, colName, srcId, props, ts.toLong, operation)
-    Option(vertex)
-  } recover {
-    case e: Throwable =>
-      logger.error(s"[toVertex]: $e", e)
-      throw e
-  } get
-  def toEdge(srcId: Any,
-             tgtId: Any,
-             labelName: String,
-             direction: String,
-             props: Map[String, Any] = Map.empty,
-             ts: Long = System.currentTimeMillis(),
-             operation: String = "insert"): S2EdgeLike = {
-    val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
-    val srcColumn = if (direction == "out") label.srcColumn else label.tgtColumn
-    val tgtColumn = if (direction == "out") label.tgtColumn else label.srcColumn
-    val srcVertexIdInnerVal = toInnerVal(srcId, srcColumn.columnType, label.schemaVersion)
-    val tgtVertexIdInnerVal = toInnerVal(tgtId, tgtColumn.columnType, label.schemaVersion)
-    val srcVertex = newVertex(SourceVertexId(srcColumn, srcVertexIdInnerVal), System.currentTimeMillis())
-    val tgtVertex = newVertex(TargetVertexId(tgtColumn, tgtVertexIdInnerVal), System.currentTimeMillis())
-    val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
-    val propsPlusTs = props ++ Map( -> ts)
-    val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
-    val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
-    new S2Edge(this, srcVertex, tgtVertex, label, dir, op = op, version = ts).copyEdgeWithState(propsWithTs)
-  }
-  def toVertex(serviceName: String,
-               columnName: String,
-               id: Any,
-               props: Map[String, Any] = Map.empty,
-               ts: Long = System.currentTimeMillis(),
-               operation: String = "insert"): S2VertexLike = {
-    val service = Service.findByName(serviceName).getOrElse(throw new java.lang.IllegalArgumentException(s"$serviceName is not found."))
-    val column = ServiceColumn.find(, columnName).getOrElse(throw new java.lang.IllegalArgumentException(s"$columnName is not found."))
-    val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
-    val srcVertexId = id match {
-      case vid: VertexId => id.asInstanceOf[VertexId]
-      case _ => VertexId(column, toInnerVal(id, column.columnType, column.schemaVersion))
-    }
-    val propsInner = column.propsToInnerVals(props) ++
-      Map(ColumnMeta.timestamp -> InnerVal.withLong(ts, column.schemaVersion))
-    val vertex = new S2Vertex(this, srcVertexId, ts, S2Vertex.EmptyProps, op)
-    S2Vertex.fillPropsWithTs(vertex, propsInner)
-    vertex
-  }
-  def toRequestEdge(queryRequest: QueryRequest, parentEdges: Seq[EdgeWithScore]): S2EdgeLike = {
-    val srcVertex = queryRequest.vertex
-    val queryParam = queryRequest.queryParam
-    val tgtVertexIdOpt = queryParam.tgtVertexInnerIdOpt
-    val label = queryParam.label
-    val labelWithDir = queryParam.labelWithDir
-    val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir)
-    val propsWithTs = label.EmptyPropsWithTs
-    tgtVertexIdOpt match {
-      case Some(tgtVertexId) => // _to is given.
-        /* we use toSnapshotEdge so dont need to swap src, tgt */
-        val src = srcVertex.innerId
-        val tgt = tgtVertexId
-        val (srcVId, tgtVId) = (SourceVertexId(srcColumn, src), TargetVertexId(tgtColumn, tgt))
-        val (srcV, tgtV) = (newVertex(srcVId), newVertex(tgtVId))
-        newEdge(srcV, tgtV, label, labelWithDir.dir, propsWithTs = propsWithTs)
-      case None =>
-        val src = srcVertex.innerId
-        val srcVId = SourceVertexId(srcColumn, src)
-        val srcV = newVertex(srcVId)
-        newEdge(srcV, srcV, label, labelWithDir.dir, propsWithTs = propsWithTs, parentEdges = parentEdges)
-    }
-  }
-  /**
-   * helper to create new Edge instance from given parameters on memory(not actually stored in storage).
-   *
-   * Since we are using mutable map for property value(propsWithTs),
-   * we should make sure that reference for mutable map never be shared between multiple Edge instances.
-   * To guarantee this, we never create Edge directly, but rather use this helper which is available on S2Graph.
-   *
-   * Note that we are using following convention
-   * 1. `add*` for method that actually store instance into storage,
-   * 2. `new*` for method that only create instance on memory, but not store it into storage.
-   *
-   * @param srcVertex
-   * @param tgtVertex
-   * @param innerLabel
-   * @param dir
-   * @param op
-   * @param version
-   * @param propsWithTs
-   * @param parentEdges
-   * @param originalEdgeOpt
-   * @param pendingEdgeOpt
-   * @param statusCode
-   * @param lockTs
-   * @param tsInnerValOpt
-   * @return
-   */
   def newEdge(srcVertex: S2VertexLike,
               tgtVertex: S2VertexLike,
               innerLabel: Label,
@@ -1336,89 +1003,22 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
               pendingEdgeOpt: Option[S2EdgeLike] = None,
               statusCode: Byte = 0,
               lockTs: Option[Long] = None,
-              tsInnerValOpt: Option[InnerValLike] = None): S2EdgeLike = {
-    val edge = S2Edge(
-      this,
-      srcVertex,
-      tgtVertex,
-      innerLabel,
-      dir,
-      op,
-      version,
-      S2Edge.EmptyProps,
-      parentEdges,
-      originalEdgeOpt,
-      pendingEdgeOpt,
-      statusCode,
-      lockTs,
-      tsInnerValOpt)
-    S2Edge.fillPropsWithTs(edge, propsWithTs)
-    edge
-  }
-  /**
-   * helper to create new SnapshotEdge instance from given parameters on memory(not actually stored in storage).
-   *
-   * Note that this is only available to S2Graph, not structure.Graph so only internal code should use this method.
-   * @param srcVertex
-   * @param tgtVertex
-   * @param label
-   * @param dir
-   * @param op
-   * @param version
-   * @param propsWithTs
-   * @param pendingEdgeOpt
-   * @param statusCode
-   * @param lockTs
-   * @param tsInnerValOpt
-   * @return
-   */
-  private[core] def newSnapshotEdge(srcVertex: S2VertexLike,
-                                    tgtVertex: S2VertexLike,
-                                    label: Label,
-                                    dir: Int,
-                                    op: Byte,
-                                    version: Long,
-                                    propsWithTs: S2Edge.State,
-                                    pendingEdgeOpt: Option[S2EdgeLike],
-                                    statusCode: Byte = 0,
-                                    lockTs: Option[Long],
-                                    tsInnerValOpt: Option[InnerValLike] = None): SnapshotEdge = {
-    val snapshotEdge = new SnapshotEdge(this, srcVertex, tgtVertex, label, dir, op, version, S2Edge.EmptyProps,
-      pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
-    S2Edge.fillPropsWithTs(snapshotEdge, propsWithTs)
-    snapshotEdge
-  }
-  def newVertexId(serviceName: String)(columnName: String)(id: Any): VertexId = {
-    val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found."))
-    val column = ServiceColumn.find(, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found."))
-    newVertexId(service, column, id)
-  }
+              tsInnerValOpt: Option[InnerValLike] = None): S2EdgeLike =
+    elementBuilder.newEdge(srcVertex, tgtVertex, innerLabel, dir, op, version, propsWithTs,
+      parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
-  /**
-   * helper to create S2Graph's internal VertexId instance with given parameters.
-   * @param service
-   * @param column
-   * @param id
-   * @return
-   */
   def newVertexId(service: Service,
                   column: ServiceColumn,
-                  id: Any): VertexId = {
-    val innerVal = CanInnerValLike.anyToInnerValLike.toInnerVal(id)(column.schemaVersion)
-    new VertexId(column, innerVal)
-  }
+                  id: Any): VertexId =
+    elementBuilder.newVertexId(service, column, id)
   def newVertex(id: VertexId,
                 ts: Long = System.currentTimeMillis(),
                 props: S2Vertex.Props = S2Vertex.EmptyProps,
                 op: Byte = 0,
-                belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike = {
-    val vertex = new S2Vertex(this, id, ts, S2Vertex.EmptyProps, op, belongLabelIds)
-    S2Vertex.fillPropsWithTs(vertex, props)
-    vertex
-  }
+                belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike =
+    elementBuilder.newVertex(id, ts, props, op, belongLabelIds)
   def getVertex(vertexId: VertexId): Option[S2VertexLike] = {
     val v = newVertex(vertexId)
@@ -1441,248 +1041,26 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
       stepResultLs.foreach(_.edgeWithScores.foreach(es => ls.add(es.edge)))
-//    getEdges(query).map { stepResult =>
-//      val ls = new util.ArrayList[Edge]()
-//      stepResult.edgeWithScores.foreach(es => ls.add(es.edge))
-//      ls.iterator()
-//    }
-  }
-  /**
-   * used by graph.traversal().V()
-   * @param ids: array of VertexId values. note that last parameter can be used to control if actually fetch vertices from storage or not.
-   *                 since S2Graph use user-provided id as part of edge, it is possible to
-   *                 fetch edge without fetch start vertex. default is false which means we are not fetching vertices from storage.
-   * @return
-   */
-  override def vertices(ids: AnyRef*): util.Iterator[structure.Vertex] = {
-    val fetchVertices = { lastParam =>
-      if (lastParam.isInstanceOf[Boolean]) lastParam.asInstanceOf[Boolean]
-      else true
-    }.getOrElse(true)
-    if (ids.isEmpty) {
-      //TODO: default storage need to be fixed.
-      Await.result(defaultStorage.fetchVerticesAll(), WaitTimeout).iterator
-    } else {
-      val vertices = ids.collect {
-        case s2Vertex: S2VertexLike => s2Vertex
-        case vId: VertexId => newVertex(vId)
-        case vertex: Vertex => newVertex([VertexId])
-        case other @ _ => newVertex(VertexId.fromString(other.toString))
-      }
-      if (fetchVertices) {
-        val future = getVertices(vertices).map { vs =>
-          val ls = new util.ArrayList[structure.Vertex]()
-          ls.addAll(vs)
-          ls.iterator()
-        }
-        Await.result(future, WaitTimeout)
-      } else {
-        vertices.iterator
-      }
-    }
-  }
-  override def edges(edgeIds: AnyRef*): util.Iterator[structure.Edge] = {
-    if (edgeIds.isEmpty) {
-      // FIXME
-      Await.result(defaultStorage.fetchEdgesAll(), WaitTimeout).iterator
-    } else {
-      Await.result(edgesAsync(edgeIds: _*), WaitTimeout)
-    }
-  }
-  def edgesAsync(edgeIds: AnyRef*): Future[util.Iterator[structure.Edge]] = {
-    val s2EdgeIds = edgeIds.collect {
-      case s2Edge: S2EdgeLike =>[EdgeId]
-      case id: EdgeId => id
-      case s: String => EdgeId.fromString(s)
-    }
-    val edgesToFetch = for {
-      id <- s2EdgeIds
-    } yield {
-        toEdge(id.srcVertexId, id.tgtVertexId, id.labelName, id.direction)
-      }
-    checkEdges(edgesToFetch).map { stepResult =>
-      val ls = new util.ArrayList[structure.Edge]
-      stepResult.edgeWithScores.foreach { es => ls.add(es.edge) }
-      ls.iterator()
-    }
-  }
-  override def tx(): Transaction = {
-    if (!features.graph.supportsTransactions) throw Graph.Exceptions.transactionsNotSupported
-    ???
-  }
-  override def variables(): Variables = new S2GraphVariables
-  override def configuration(): Configuration = apacheConfiguration
-  override def addVertex(label: String): Vertex = {
-    if (label == null) throw Element.Exceptions.labelCanNotBeNull
-    if (label.isEmpty) throw Element.Exceptions.labelCanNotBeEmpty
-    addVertex(Seq(T.label, label): _*)
-  }
-  def makeVertex(idValue: AnyRef, kvsMap: Map[String, AnyRef]): S2VertexLike = {
-    idValue match {
-      case vId: VertexId =>
-        toVertex(vId.column.service.serviceName, vId.column.columnName, vId, kvsMap)
-      case _ =>
-        val serviceColumnNames = kvsMap.getOrElse(T.label.toString, DefaultColumnName).toString
-        val names = serviceColumnNames.split(S2Vertex.VertexLabelDelimiter)
-        val (serviceName, columnName) =
-          if (names.length == 1) (DefaultServiceName, names(0))
-          else throw new RuntimeException("malformed data on vertex label.")
-        toVertex(serviceName, columnName, idValue, kvsMap)
-    }
-  }
-  override def addVertex(kvs: AnyRef*): structure.Vertex = {
-    if (!features().vertex().supportsUserSuppliedIds() && kvs.contains( {
-      throw Vertex.Exceptions.userSuppliedIdsNotSupported
-    }
-    val kvsMap = S2Property.kvsToProps(kvs)
-    kvsMap.get( match {
-      case Some(idValue) if !S2Property.validType(idValue) =>
-        throw Vertex.Exceptions.userSuppliedIdsOfThisTypeNotSupported()
-      case _ =>
-    }
-    kvsMap.foreach { case (k, v) => S2Property.assertValidProp(k, v) }
-    if (kvsMap.contains( && kvsMap(
-      throw Element.Exceptions.labelCanNotBeEmpty
-    val vertex = kvsMap.get( match {
-      case None => // do nothing
-        val id = nextLocalLongId
-        makeVertex(, kvsMap)
-      case Some(idValue) if S2Property.validType(idValue) =>
-        makeVertex(idValue, kvsMap)
-      case _ =>
-        throw Vertex.Exceptions.userSuppliedIdsOfThisTypeNotSupported
-    }
-    addVertexInner(vertex)
-    vertex
-  }
-  def addVertex(id: VertexId,
-                ts: Long = System.currentTimeMillis(),
-                props: S2Vertex.Props = S2Vertex.EmptyProps,
-                op: Byte = 0,
-                belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike = {
-    val vertex = newVertex(id, ts, props, op, belongLabelIds)
-    val future = mutateVertices(Seq(vertex), withWait = true).map { rets =>
-      if (rets.forall(_.isSuccess)) vertex
-      else throw new RuntimeException("addVertex failed.")
-    }
-    Await.ready(future, WaitTimeout)
-    vertex
-  }
-  def addVertexInner(vertex: S2VertexLike): S2VertexLike = {
-    val future = mutateVertices(Seq(vertex), withWait = true).flatMap { rets =>
-      if (rets.forall(_.isSuccess)) {
-        indexProvider.mutateVerticesAsync(Seq(vertex))
-      } else throw new RuntimeException("addVertex failed.")
-    }
-    Await.ready(future, WaitTimeout)
-    vertex
-  }
-  /* tp3 only */
-  def addEdge(srcVertex: S2VertexLike, labelName: String, tgtVertex: Vertex, kvs: AnyRef*): Edge = {
-    val containsId = kvs.contains(
-    tgtVertex match {
-      case otherV: S2VertexLike =>
-        if (!features().edge().supportsUserSuppliedIds() && containsId) {
-          throw Exceptions.userSuppliedIdsNotSupported()
-        }
-        val props = S2Property.kvsToProps(kvs)
-        props.foreach { case (k, v) => S2Property.assertValidProp(k, v) }
-        //TODO: direction, operation, _timestamp need to be reserved property key.
-        try {
-          val direction = props.get("direction").getOrElse("out").toString
-          val ts = props.get(
-          val operation = props.get("operation").map(_.toString).getOrElse("insert")
-          val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
-          val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
-          val propsPlusTs = props ++ Map( -> ts)
-          val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
-          val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
-          val edge = newEdge(srcVertex, otherV, label, dir, op = op, version = ts, propsWithTs = propsWithTs)
-          val future = mutateEdges(Seq(edge), withWait = true).flatMap { rets =>
-            indexProvider.mutateEdgesAsync(Seq(edge))
-          }
-          Await.ready(future, WaitTimeout)
-          edge
-        } catch {
-          case e: LabelNotExistException => throw new java.lang.IllegalArgumentException(e)
-        }
-      case null => throw new java.lang.IllegalArgumentException
-      case _ => throw new RuntimeException("only S2Graph vertex can be used.")
-    }
-  override def close(): Unit = {
-    shutdown()
-  }
-  override def compute[C <: GraphComputer](aClass: Class[C]): C = ???
-  override def compute(): GraphComputer = {
-    if (!features.graph.supportsComputer) {
-      throw Graph.Exceptions.graphComputerNotSupported
-    }
-    ???
-  }
-  class S2GraphFeatures extends Features {
-    import org.apache.s2graph.core.{features => FS}
-    override def edge(): Features.EdgeFeatures = new FS.S2EdgeFeatures
-    override def graph(): Features.GraphFeatures = new FS.S2GraphFeatures
-    override def supports(featureClass: Class[_ <: Features.FeatureSet], feature: String): Boolean =
-      super.supports(featureClass, feature)
-    override def vertex(): Features.VertexFeatures = new FS.S2VertexFeatures
-    override def toString: String = {
-      s"FEATURES:\nEdgeFeatures:${edge}\nGraphFeatures:${graph}\nVertexFeatures:${vertex}"
-    }
-  }
-  private val s2Features = new S2GraphFeatures
-  override def features() = s2Features
-  override def toString(): String = "[s2graph]"
+  def toVertex(serviceName: String,
+               columnName: String,
+               id: Any,
+               props: Map[String, Any] = Map.empty,
+               ts: Long = System.currentTimeMillis(),
+               operation: String = "insert"): S2VertexLike =
+    elementBuilder.toVertex(serviceName, columnName, id, props, ts, operation)
-  override def io[I <: Io[_ <: GraphReader.ReaderBuilder[_ <: GraphReader], _ <: GraphWriter.WriterBuilder[_ <: GraphWriter], _ <: Mapper.Builder[_]]](builder: Io.Builder[I]): I = {
-    builder.graph(this).registry(S2GraphIoRegistry.instance).create().asInstanceOf[I]
-  }
+  def toEdge(srcId: Any,
+             tgtId: Any,
+             labelName: String,
+             direction: String,
+             props: Map[String, Any] = Map.empty,
+             ts: Long = System.currentTimeMillis(),
+             operation: String = "insert"): S2EdgeLike =
+    elementBuilder.toEdge(srcId, tgtId, labelName, direction, props, ts, operation)
+  def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] =
+    elementBuilder.toGraphElement(s, labelMapping)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
new file mode 100644
index 0000000..03a92c6
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
@@ -0,0 +1,238 @@
+package org.apache.s2graph.core
+import java.util
+import org.apache.commons.configuration.Configuration
+import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
+import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName}
+import org.apache.s2graph.core.features.{S2Features, S2GraphVariables}
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.types.VertexId
+import org.apache.tinkerpop.gremlin.structure
+import org.apache.tinkerpop.gremlin.structure.Edge.Exceptions
+import org.apache.tinkerpop.gremlin.structure.Graph.{Features, Variables}
+import{GraphReader, GraphWriter, Io, Mapper}
+import org.apache.tinkerpop.gremlin.structure.{Edge, Element, Graph, T, Transaction, Vertex}
+import scala.concurrent.{Await, Future}
+import scala.collection.JavaConversions._
+trait S2GraphLike extends Graph {
+  this: S2Graph =>
+  var apacheConfiguration: Configuration
+  private val s2Features = new S2Features
+  override def features() = s2Features
+  def vertices(ids: AnyRef*): util.Iterator[structure.Vertex] = {
+    val fetchVertices = { lastParam =>
+      if (lastParam.isInstanceOf[Boolean]) lastParam.asInstanceOf[Boolean]
+      else true
+    }.getOrElse(true)
+    if (ids.isEmpty) {
+      //TODO: default storage need to be fixed.
+      Await.result(defaultStorage.fetchVerticesAll(), WaitTimeout).iterator
+    } else {
+      val vertices = ids.collect {
+        case s2Vertex: S2VertexLike => s2Vertex
+        case vId: VertexId => newVertex(vId)
+        case vertex: Vertex => newVertex([VertexId])
+        case other@_ => newVertex(VertexId.fromString(other.toString))
+      }
+      if (fetchVertices) {
+        val future = getVertices(vertices).map { vs =>
+          val ls = new util.ArrayList[structure.Vertex]()
+          ls.addAll(vs)
+          ls.iterator()
+        }
+        Await.result(future, WaitTimeout)
+      } else {
+        vertices.iterator
+      }
+    }
+  }
+  def edges(edgeIds: AnyRef*): util.Iterator[structure.Edge] = {
+    if (edgeIds.isEmpty) {
+      // FIXME
+      Await.result(defaultStorage.fetchEdgesAll(), WaitTimeout).iterator
+    } else {
+      Await.result(edgesAsync(edgeIds: _*), WaitTimeout)
+    }
+  }
+  def edgesAsync(edgeIds: AnyRef*): Future[util.Iterator[structure.Edge]] = {
+    val s2EdgeIds = edgeIds.collect {
+      case s2Edge: S2EdgeLike =>[EdgeId]
+      case id: EdgeId => id
+      case s: String => EdgeId.fromString(s)
+    }
+    val edgesToFetch = for {
+      id <- s2EdgeIds
+    } yield {
+      elementBuilder.toEdge(id.srcVertexId, id.tgtVertexId, id.labelName, id.direction)
+    }
+    checkEdges(edgesToFetch).map { stepResult =>
+      val ls = new util.ArrayList[structure.Edge]
+      stepResult.edgeWithScores.foreach { es => ls.add(es.edge) }
+      ls.iterator()
+    }
+  }
+  def tx(): Transaction = {
+    if (!features.graph.supportsTransactions) throw Graph.Exceptions.transactionsNotSupported
+    ???
+  }
+  def variables(): Variables = new S2GraphVariables
+  def configuration(): Configuration = apacheConfiguration
+  def addVertex(label: String): Vertex = {
+    if (label == null) throw Element.Exceptions.labelCanNotBeNull
+    if (label.isEmpty) throw Element.Exceptions.labelCanNotBeEmpty
+    addVertex(Seq(T.label, label): _*)
+  }
+  def makeVertex(idValue: AnyRef, kvsMap: Map[String, AnyRef]): S2VertexLike = {
+    idValue match {
+      case vId: VertexId =>
+        elementBuilder.toVertex(vId.column.service.serviceName, vId.column.columnName, vId, kvsMap)
+      case _ =>
+        val serviceColumnNames = kvsMap.getOrElse(T.label.toString, DefaultColumnName).toString
+        val names = serviceColumnNames.split(S2Vertex.VertexLabelDelimiter)
+        val (serviceName, columnName) =
+          if (names.length == 1) (DefaultServiceName, names(0))
+          else throw new RuntimeException("malformed data on vertex label.")
+        elementBuilder.toVertex(serviceName, columnName, idValue, kvsMap)
+    }
+  }
+  def addVertex(kvs: AnyRef*): structure.Vertex = {
+    if (!features().vertex().supportsUserSuppliedIds() && kvs.contains( {
+      throw Vertex.Exceptions.userSuppliedIdsNotSupported
+    }
+    val kvsMap = S2Property.kvsToProps(kvs)
+    kvsMap.get( match {
+      case Some(idValue) if !S2Property.validType(idValue) =>
+        throw Vertex.Exceptions.userSuppliedIdsOfThisTypeNotSupported()
+      case _ =>
+    }
+    kvsMap.foreach { case (k, v) => S2Property.assertValidProp(k, v) }
+    if (kvsMap.contains( && kvsMap(
+      throw Element.Exceptions.labelCanNotBeEmpty
+    val vertex = kvsMap.get( match {
+      case None => // do nothing
+        val id = nextLocalLongId
+        makeVertex(, kvsMap)
+      case Some(idValue) if S2Property.validType(idValue) =>
+        makeVertex(idValue, kvsMap)
+      case _ =>
+        throw Vertex.Exceptions.userSuppliedIdsOfThisTypeNotSupported
+    }
+    addVertexInner(vertex)
+    vertex
+  }
+  def addVertex(id: VertexId,
+                ts: Long = System.currentTimeMillis(),
+                props: S2Vertex.Props = S2Vertex.EmptyProps,
+                op: Byte = 0,
+                belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike = {
+    val vertex = newVertex(id, ts, props, op, belongLabelIds)
+    val future = mutateVertices(Seq(vertex), withWait = true).map { rets =>
+      if (rets.forall(_.isSuccess)) vertex
+      else throw new RuntimeException("addVertex failed.")
+    }
+    Await.ready(future, WaitTimeout)
+    vertex
+  }
+  def addVertexInner(vertex: S2VertexLike): S2VertexLike = {
+    val future = mutateVertices(Seq(vertex), withWait = true).flatMap { rets =>
+      if (rets.forall(_.isSuccess)) {
+        indexProvider.mutateVerticesAsync(Seq(vertex))
+      } else throw new RuntimeException("addVertex failed.")
+    }
+    Await.ready(future, WaitTimeout)
+    vertex
+  }
+  /* tp3 only */
+  def addEdge(srcVertex: S2VertexLike, labelName: String, tgtVertex: Vertex, kvs: AnyRef*): Edge = {
+    val containsId = kvs.contains(
+    tgtVertex match {
+      case otherV: S2VertexLike =>
+        if (!features().edge().supportsUserSuppliedIds() && containsId) {
+          throw Exceptions.userSuppliedIdsNotSupported()
+        }
+        val props = S2Property.kvsToProps(kvs)
+        props.foreach { case (k, v) => S2Property.assertValidProp(k, v) }
+        //TODO: direction, operation, _timestamp need to be reserved property key.
+        try {
+          val direction = props.get("direction").getOrElse("out").toString
+          val ts = props.get(
+          val operation = props.get("operation").map(_.toString).getOrElse("insert")
+          val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
+          val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
+          val propsPlusTs = props ++ Map( -> ts)
+          val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
+          val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
+          val edge = newEdge(srcVertex, otherV, label, dir, op = op, version = ts, propsWithTs = propsWithTs)
+          val future = mutateEdges(Seq(edge), withWait = true).flatMap { rets =>
+            indexProvider.mutateEdgesAsync(Seq(edge))
+          }
+          Await.ready(future, WaitTimeout)
+          edge
+        } catch {
+          case e: LabelNotExistException => throw new java.lang.IllegalArgumentException(e)
+        }
+      case null => throw new java.lang.IllegalArgumentException
+      case _ => throw new RuntimeException("only S2Graph vertex can be used.")
+    }
+  }
+  def close(): Unit = {
+    shutdown()
+  }
+  def compute[C <: GraphComputer](aClass: Class[C]): C = ???
+  def compute(): GraphComputer = {
+    if (!features.graph.supportsComputer) {
+      throw Graph.Exceptions.graphComputerNotSupported
+    }
+    ???
+  }
+  def io[I <: Io[_ <: GraphReader.ReaderBuilder[_ <: GraphReader], _ <: GraphWriter.WriterBuilder[_ <: GraphWriter], _ <: Mapper.Builder[_]]](builder: Io.Builder[I]): I = {
+    builder.graph(this).registry(S2GraphIoRegistry.instance).create().asInstanceOf[I]
+  }
+  override def toString(): String = "[s2graph]"
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/features/S2Features.scala b/s2core/src/main/scala/org/apache/s2graph/core/features/S2Features.scala
new file mode 100644
index 0000000..36c9ecc
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/features/S2Features.scala
@@ -0,0 +1,19 @@
+package org.apache.s2graph.core.features
+import org.apache.tinkerpop.gremlin.structure.Graph.Features
+class S2Features extends Features {
+  import org.apache.s2graph.core.{features => FS}
+  override def edge(): Features.EdgeFeatures = new FS.S2EdgeFeatures
+  override def graph(): Features.GraphFeatures = new FS.S2GraphFeatures
+  override def supports(featureClass: Class[_ <: Features.FeatureSet], feature: String): Boolean =
+    super.supports(featureClass, feature)
+  override def vertex(): Features.VertexFeatures = new FS.S2VertexFeatures
+  override def toString: String = {
+    s"FEATURES:\nEdgeFeatures:${edge}\nGraphFeatures:${graph}\nVertexFeatures:${vertex}"
+  }
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index e6075ec..2a8f1e2 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -22,9 +22,10 @@ package
 import com.typesafe.config.Config
 import org.apache.s2graph.core._
+import{Deserializable, MutationHelper}
 import org.apache.s2graph.core.types._
 import scala.concurrent.{ExecutionContext, Future}
 abstract class Storage(val graph: S2Graph,
@@ -61,49 +62,51 @@ abstract class Storage(val graph: S2Graph,
   lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, mutator, fetcher)
-  /** IO **/
-  def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge): serde.Serializable[SnapshotEdge] =
-    serDe.snapshotEdgeSerializer(snapshotEdge)
-  def indexEdgeSerializer(indexEdge: IndexEdge): serde.Serializable[IndexEdge] =
-    serDe.indexEdgeSerializer(indexEdge)
-  def vertexSerializer(vertex: S2VertexLike): serde.Serializable[S2VertexLike] =
-    serDe.vertexSerializer(vertex)
-  def snapshotEdgeDeserializer(schemaVer: String): Deserializable[SnapshotEdge] =
-    serDe.snapshotEdgeDeserializer(schemaVer)
-  def indexEdgeDeserializer(schemaVer: String): IndexEdgeDeserializable =
-    serDe.indexEdgeDeserializer(schemaVer)
-  def vertexDeserializer(schemaVer: String): Deserializable[S2VertexLike] =
-    serDe.vertexDeserializer(schemaVer)
+  lazy val mutationHelper: MutationHelper = new MutationHelper(this)
+//  /** IO **/
+//  def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge): serde.Serializable[SnapshotEdge] =
+//    serDe.snapshotEdgeSerializer(snapshotEdge)
+//  def indexEdgeSerializer(indexEdge: IndexEdge): serde.Serializable[IndexEdge] =
+//    serDe.indexEdgeSerializer(indexEdge)
+//  def vertexSerializer(vertex: S2Vertex): serde.Serializable[S2Vertex] =
+//    serDe.vertexSerializer(vertex)
+//  def snapshotEdgeDeserializer(schemaVer: String): Deserializable[SnapshotEdge] =
+//    serDe.snapshotEdgeDeserializer(schemaVer)
+//  def indexEdgeDeserializer(schemaVer: String): IndexEdgeDeserializable =
+//    serDe.indexEdgeDeserializer(schemaVer)
+//  def vertexDeserializer(schemaVer: String): Deserializable[S2Vertex] =
+//    serDe.vertexDeserializer(schemaVer)
   /** Mutation Builder */
-  def increments(edgeMutate: EdgeMutate): (Seq[SKeyValue], Seq[SKeyValue]) =
-    io.increments(edgeMutate)
-  def indexedEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] =
-    io.indexedEdgeMutations(edgeMutate)
-  def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] =
-    io.buildIncrementsAsync(indexedEdge, amount)
-  def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] =
-    io.buildIncrementsCountAsync(indexedEdge, amount)
-  def buildVertexPutsAsync(edge: S2EdgeLike): Seq[SKeyValue] =
-    io.buildVertexPutsAsync(edge)
-  def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] =
-    io.snapshotEdgeMutations(edgeMutate)
-  def buildDegreePuts(edge: S2EdgeLike, degreeVal: Long): Seq[SKeyValue] =
-    io.buildDegreePuts(edge, degreeVal)
-  def buildPutsAll(vertex: S2VertexLike): Seq[SKeyValue] =
-    io.buildPutsAll(vertex)
+//  def increments(edgeMutate: EdgeMutate): (Seq[SKeyValue], Seq[SKeyValue]) =
+//    io.increments(edgeMutate)
+//  def indexedEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] =
+//    io.indexedEdgeMutations(edgeMutate)
+//  def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] =
+//    io.buildIncrementsAsync(indexedEdge, amount)
+//  def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] =
+//    io.buildIncrementsCountAsync(indexedEdge, amount)
+//  def buildVertexPutsAsync(edge: S2Edge): Seq[SKeyValue] =
+//    io.buildVertexPutsAsync(edge)
+//  def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] =
+//    io.snapshotEdgeMutations(edgeMutate)
+//  def buildDegreePuts(edge: S2Edge, degreeVal: Long): Seq[SKeyValue] =
+//    io.buildDegreePuts(edge, degreeVal)
+//  def buildPutsAll(vertex: S2Vertex): Seq[SKeyValue] =
+//    io.buildPutsAll(vertex)
   /** Mutation **/
@@ -129,8 +132,8 @@ abstract class Storage(val graph: S2Graph,
   /** Conflict Resolver **/
-  def retry(tryNum: Int)(edges: Seq[S2EdgeLike], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2EdgeLike])(implicit ec: ExecutionContext): Future[Boolean] =
-    conflictResolver.retry(tryNum)(edges, statusCode, fetchedSnapshotEdgeOpt)
+//  def retry(tryNum: Int)(edges: Seq[S2Edge], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2Edge])(implicit ec: ExecutionContext): Future[Boolean] =
+//    conflictResolver.retry(tryNum)(edges, statusCode, fetchedSnapshotEdgeOpt)
   /** Management **/
@@ -145,4 +148,25 @@ abstract class Storage(val graph: S2Graph,
   def shutdown(): Unit = management.shutdown()
   def info: Map[String, String] = Map("className" -> this.getClass.getSimpleName)
+  def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
+                                    requestTs: Long,
+                                    retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] =
+    mutationHelper.deleteAllFetchedEdgesAsyncOld(stepInnerResult, requestTs, retryNum)
+  def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
+    mutationHelper.mutateVertex(zkQuorum: String, vertex, withWait)
+  def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] =
+    mutationHelper.mutateStrongEdges(zkQuorum, _edges, withWait)
+  def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] =
+    mutationHelper.mutateWeakEdges(zkQuorum, _edges, withWait)
+  def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] =
+    mutationHelper.incrementCounts(zkQuorum, edges, withWait)
+  def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] =
+    mutationHelper.updateDegree(zkQuorum, edge, degreeVal)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
index 6be5f60..bdb6e99 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
@@ -303,7 +303,7 @@ class AsynchbaseStorageReadable(val graph: S2Graph,
     val cacheTTL = queryParam.cacheTTLInMillis
     /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */
-    val edge = graph.toRequestEdge(queryRequest, parentEdges)
+    val edge = graph.elementBuilder.toRequestEdge(queryRequest, parentEdges)
     val request = buildRequest(queryRequest, edge)
     val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge))
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
index b618962..408822a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
@@ -97,7 +97,7 @@ class SnapshotEdgeDeserializable(graph: S2Graph) extends Deserializable[Snapshot
-        val snapshotEdge = graph.newSnapshotEdge(graph.newVertex(srcVertexId, ts), graph.newVertex(tgtVertexId, ts),
+        val snapshotEdge = graph.elementBuilder.newSnapshotEdge(graph.newVertex(srcVertexId, ts), graph.newVertex(tgtVertexId, ts),
           label, labelWithDir.dir, op, version, props.toMap, statusCode = statusCode,
           pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal))
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
index 8c961ce..ff8eb80 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
@@ -88,7 +88,7 @@ class SnapshotEdgeDeserializable(graph: S2Graph) extends Deserializable[Snapshot
-        val snapshotEdge = graph.newSnapshotEdge(graph.newVertex(srcVertexId, ts), graph.newVertex(tgtVertexId, ts),
+        val snapshotEdge = graph.elementBuilder.newSnapshotEdge(graph.newVertex(srcVertexId, ts), graph.newVertex(tgtVertexId, ts),
           label, labelWithDir.dir, op, version, props.toMap, statusCode = statusCode,
           pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal))
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
index 0cbaa81..24e98a1 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
@@ -46,8 +46,8 @@ class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels {
     val labelOpt = Option(l)
     val edge = graph.newEdge(vertex, tgtVertex, l, labelWithDir.dir, 0, ts, props, tsInnerValOpt = Option(InnerVal.withLong(ts, l.schemaVersion)))
     val indexEdge = edge.edgesWithIndex.find(_.labelIndexSeq == LabelIndex.DefaultSeq).head
-    val kvs = graph.getStorage(l).indexEdgeSerializer(indexEdge).toKeyValues
-    val _indexEdgeOpt = graph.getStorage(l).indexEdgeDeserializer(l.schemaVersion).fromKeyValues(kvs, None)
+    val kvs = graph.getStorage(l).serDe.indexEdgeSerializer(indexEdge).toKeyValues
+    val _indexEdgeOpt = graph.getStorage(l).serDe.indexEdgeDeserializer(l.schemaVersion).fromKeyValues(kvs, None)
     _indexEdgeOpt should not be empty
     edge == _indexEdgeOpt.get should be(true)