You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2017/11/19 02:29:02 UTC

[19/23] incubator-s2graph git commit: remove newVertex/newEdge/newVertexId on S2GraphLike.

remove newVertex/newEdge/newVertexId on S2GraphLike.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/eabe7570
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/eabe7570
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/eabe7570

Branch: refs/heads/master
Commit: eabe7570a9ef9ec0cb7b36d518d5bef4d1fd53dd
Parents: 6403bc9
Author: DO YUNG YOON <st...@apache.org>
Authored: Fri Nov 10 22:57:51 2017 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Sat Nov 11 10:40:19 2017 +0900

----------------------------------------------------------------------
 .../loader/subscriber/GraphSubscriber.scala     |  67 +----
 .../loader/subscriber/TransferToHFile.scala     |   4 +-
 .../org/apache/s2graph/core/QueryResult.scala   |   2 +-
 .../scala/org/apache/s2graph/core/S2Edge.scala  |   8 +-
 .../org/apache/s2graph/core/S2EdgeBuilder.scala |   2 +-
 .../scala/org/apache/s2graph/core/S2Graph.scala | 256 ++-----------------
 .../org/apache/s2graph/core/S2GraphLike.scala   |  45 +---
 .../org/apache/s2graph/core/S2GraphTp.scala     |  98 +++++++
 .../org/apache/s2graph/core/S2VertexLike.scala  |   3 +-
 .../apache/s2graph/core/TraversalHelper.scala   | 153 ++++++++++-
 .../s2graph/core/rest/RequestParser.scala       |   2 +-
 .../tall/IndexEdgeDeserializable.scala          |  15 +-
 .../wide/IndexEdgeDeserializable.scala          |  15 +-
 .../tall/SnapshotEdgeDeserializable.scala       |  12 +-
 .../wide/SnapshotEdgeDeserializable.scala       |  13 +-
 .../vertex/tall/VertexDeserializable.scala      |   3 +-
 .../vertex/wide/VertexDeserializable.scala      |   3 +-
 .../org/apache/s2graph/core/S2EdgeTest.scala    |  24 +-
 .../s2graph/core/TestCommonWithModels.scala     |   3 +-
 .../s2graph/core/parsers/WhereParserTest.scala  |  22 +-
 .../s2graph/core/storage/StorageIOTest.scala    |   4 +-
 .../core/storage/hbase/IndexEdgeTest.scala      |   6 +-
 .../process/S2GraphProcessStandardTest.scala    |   4 +-
 .../S2GraphStructureIntegrateTest.scala         |   4 +-
 .../S2GraphStructureStandardTest.scala          |   4 +-
 25 files changed, 354 insertions(+), 418 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
index a371b6b..6ecb070 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
@@ -67,6 +67,7 @@ object GraphSubscriberHelper extends WithKafka {
   var g: S2Graph = null
   var management: Management = null
   val conns = new scala.collection.mutable.HashMap[String, Connection]()
+  var builder: GraphElementBuilder = null
 
   def toOption(s: String) = {
     s match {
@@ -82,6 +83,7 @@ object GraphSubscriberHelper extends WithKafka {
       val ec = ExecutionContext.Implicits.global
       g = new S2Graph(config)(ec)
       management = new Management(g)
+      builder = g.elementBuilder
     }
   }
 
@@ -106,7 +108,7 @@ object GraphSubscriberHelper extends WithKafka {
                      (statFunc: (String, Int) => Unit): Iterable[GraphElement] = {
     (for (msg <- msgs) yield {
       statFunc("total", 1)
-      g.elementBuilder.toGraphElement(msg, labelMapping) match {
+      builder.toGraphElement(msg, labelMapping) match {
         case Some(e) if e.isInstanceOf[S2Edge] =>
           statFunc("EdgeParseOk", 1)
           e.asInstanceOf[S2Edge]
@@ -122,69 +124,6 @@ object GraphSubscriberHelper extends WithKafka {
     }).toList
   }
 
-//  private def storeRec(zkQuorum: String, tableName: String, puts: List[Put], elementsSize: Int, tryNum: Int)
-//                      (statFunc: (String, Int) => Unit, statPrefix: String = "edge"): Unit = {
-//    if (tryNum <= 0) {
-//      statFunc("errorStore", elementsSize)
-//      throw new RuntimeException(s"retry failed after $maxTryNum")
-//    }
-//    val conn = getConn(zkQuorum)
-//    val mutator = conn.getBufferedMutator(TableName.valueOf(tableName))
-//    //      val table = conn.getTable(TableName.valueOf(tableName))
-//    //      table.setAutoFlush(false, false)
-//
-//    try {
-//      puts.foreach { put => put.setDurability(Durability.ASYNC_WAL) }
-//      mutator.mutate(puts)
-//      //        table.put(puts)
-//      statFunc(s"$statPrefix:storeOk", elementsSize)
-//    } catch {
-//      case e: Throwable =>
-//        e.printStackTrace()
-//        Thread.sleep(sleepPeriod)
-//        storeRec(zkQuorum, tableName, puts, elementsSize, tryNum - 1)(statFunc)
-//    } finally {
-//      mutator.close()
-//      //        table.close()
-//    }
-//  }
-//
-//  def storeDegreeBulk(zkQuorum: String, tableName: String)
-//                     (degrees: Iterable[(String, String, String, Int)], labelMapping: Map[String, String] = Map.empty)
-//                     (mapAccOpt: Option[HashMapAccumulable]): Iterable[(String, Long)] = {
-//    val counts = HashMap[String, Long]()
-//    val statFunc = storeStat(counts)(mapAccOpt) _
-//
-//    for {
-//      (vertexId, labelName, direction, degreeVal) <- degrees
-//      incrementRequests <- TransferToHFile.buildDegreePutRequests(vertexId, labelName, direction, degreeVal)
-//    } {
-//      storeRec(zkQuorum, tableName, incrementRequests, degrees.size, maxTryNum)(statFunc, "degree")
-//    }
-//    counts
-//  }
-//  def storeBulk(zkQuorum: String, tableName: String)
-//               (msgs: Seq[String], labelMapping: Map[String, String] = Map.empty, autoCreateEdge: Boolean = false)
-//               (mapAccOpt: Option[HashMapAccumulable]): Iterable[(String, Long)] = {
-//
-//    val counts = HashMap[String, Long]()
-//    val statFunc = storeStat(counts)(mapAccOpt) _
-//    val elements = toGraphElements(msgs, labelMapping)(statFunc)
-//
-//    val puts = elements.flatMap { element =>
-//      element match {
-//        case v: Vertex if v.op == GraphUtil.operations("insert") || v.op == GraphUtil.operations("insertBulk") =>
-//          v.buildPuts()
-//        case e: Edge if e.op == GraphUtil.operations("insert") || e.op == GraphUtil.operations("insertBulk") =>
-//          EdgeWriter(e).insertBulkForLoader(autoCreateEdge)
-//        case _ => Nil
-//      }
-//    } toList
-//
-//    storeRec(zkQuorum, tableName, puts, msgs.size, maxTryNum)(statFunc)
-//    counts
-//  }
-
   def storeStat(counts: HashMap[String, Long])(mapAccOpt: Option[HashMapAccumulable])(key: String, value: Int) = {
     counts.put(key, counts.getOrElse(key, 0L) + value)
     mapAccOpt match {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
index a7b4e00..6aaf6fd 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
@@ -97,11 +97,11 @@ object TransferToHFile extends SparkApp {
     val innerVal = JSONParser.jsValueToInnerVal(Json.toJson(vertexId), label.srcColumnWithDir(dir).columnType, label.schemaVersion).getOrElse {
       throw new RuntimeException(s"$vertexId can not be converted into innerval")
     }
-    val vertex = GraphSubscriberHelper.g.newVertex(SourceVertexId(label.srcColumn, innerVal))
+    val vertex = GraphSubscriberHelper.builder.newVertex(SourceVertexId(label.srcColumn, innerVal))
 
     val ts = System.currentTimeMillis()
     val propsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion))
-    val edge = GraphSubscriberHelper.g.newEdge(vertex, vertex, label, dir, propsWithTs=propsWithTs)
+    val edge = GraphSubscriberHelper.builder.newEdge(vertex, vertex, label, dir, propsWithTs=propsWithTs)
 
     edge.edgesWithIndex.flatMap { indexEdge =>
       GraphSubscriberHelper.g.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.map { kv =>

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
index 7d187c6..37ddf06 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
@@ -35,7 +35,7 @@ object QueryResult {
         val propsWithTs = Map(LabelMeta.timestamp ->
           InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), currentTs))
 
-        val edge = graph.newEdge(vertex, vertex, label, queryParam.labelWithDir.dir, propsWithTs = propsWithTs)
+        val edge = graph.elementBuilder.newEdge(vertex, vertex, label, queryParam.labelWithDir.dir, propsWithTs = propsWithTs)
         val edgeWithScore = EdgeWithScore(edge, S2Graph.DefaultScore, queryParam.label)
         edgeWithScore
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
index 3e33ed4..3eb1fa7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
@@ -817,10 +817,10 @@ object S2Edge {
     val belongLabelIds = Seq(e.getLabelId())
     if (e.getDir() == GraphUtil.directions("in")) {
       val tgtColumn = getServiceColumn(e.tgtVertex, e.innerLabel.tgtColumn)
-      e.innerGraph.newVertex(VertexId(tgtColumn, e.tgtVertex.innerId), e.tgtVertex.ts, e.tgtVertex.props, belongLabelIds = belongLabelIds)
+      e.innerGraph.elementBuilder.newVertex(VertexId(tgtColumn, e.tgtVertex.innerId), e.tgtVertex.ts, e.tgtVertex.props, belongLabelIds = belongLabelIds)
     } else {
       val srcColumn = getServiceColumn(e.srcVertex, e.innerLabel.srcColumn)
-      e.innerGraph.newVertex(VertexId(srcColumn, e.srcVertex.innerId), e.srcVertex.ts, e.srcVertex.props, belongLabelIds = belongLabelIds)
+      e.innerGraph.elementBuilder.newVertex(VertexId(srcColumn, e.srcVertex.innerId), e.srcVertex.ts, e.srcVertex.props, belongLabelIds = belongLabelIds)
     }
   }
 
@@ -828,10 +828,10 @@ object S2Edge {
     val belongLabelIds = Seq(e.getLabelId())
     if (e.getDir() == GraphUtil.directions("in")) {
       val srcColumn = getServiceColumn(e.srcVertex, e.innerLabel.srcColumn)
-      e.innerGraph.newVertex(VertexId(srcColumn, e.srcVertex.innerId), e.srcVertex.ts, e.srcVertex.props, belongLabelIds = belongLabelIds)
+      e.innerGraph.elementBuilder.newVertex(VertexId(srcColumn, e.srcVertex.innerId), e.srcVertex.ts, e.srcVertex.props, belongLabelIds = belongLabelIds)
     } else {
       val tgtColumn = getServiceColumn(e.tgtVertex, e.innerLabel.tgtColumn)
-      e.innerGraph.newVertex(VertexId(tgtColumn, e.tgtVertex.innerId), e.tgtVertex.ts, e.tgtVertex.props, belongLabelIds = belongLabelIds)
+      e.innerGraph.elementBuilder.newVertex(VertexId(tgtColumn, e.tgtVertex.innerId), e.tgtVertex.ts, e.tgtVertex.props, belongLabelIds = belongLabelIds)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
index 4004a13..dc0baa2 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
@@ -80,7 +80,7 @@ class S2EdgeBuilder(edge: S2EdgeLike) {
 
   def updateTgtVertex(id: InnerValLike): S2EdgeLike = {
     val newId = TargetVertexId(edge.tgtVertex.id.column, id)
-    val newTgtVertex = edge.innerGraph.newVertex(newId, edge.tgtVertex.ts, edge.tgtVertex.props)
+    val newTgtVertex = edge.innerGraph.elementBuilder.newVertex(newId, edge.tgtVertex.ts, edge.tgtVertex.props)
     copyEdge(tgtVertex = newTgtVertex)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index f80d5c2..3905442 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -21,7 +21,7 @@ package org.apache.s2graph.core
 
 import java.util
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
-import java.util.concurrent.{Executors, TimeUnit}
+import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
 
 import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.commons.configuration.{BaseConfiguration, Configuration}
@@ -96,6 +96,8 @@ object S2Graph {
   val DefaultColumnName = "vertex"
   val DefaultLabelName = "_s2graph"
 
+  var hbaseExecutor: ExecutorService = _
+
   val graphStrategies: TraversalStrategies =
     TraversalStrategies.GlobalCache.getStrategies(classOf[Graph]).addStrategies(S2GraphStepStrategy.instance)
 
@@ -130,7 +132,14 @@ object S2Graph {
     logger.info(s"[InitStorage]: $storageBackend")
 
     storageBackend match {
-      case "hbase" => new AsynchbaseStorage(graph, config)
+      case "hbase" =>
+        hbaseExecutor  =
+          if (config.getString("hbase.zookeeper.quorum") == "localhost")
+            AsynchbaseStorage.initLocalHBase(config)
+          else
+            null
+
+        new AsynchbaseStorage(graph, config)
       case _ => throw new RuntimeException("not supported storage.")
     }
   }
@@ -150,94 +159,6 @@ object S2Graph {
   }
 }
 
-
-@Graph.OptIns(value = Array(
-  new Graph.OptIn(value = Graph.OptIn.SUITE_PROCESS_STANDARD),
-  new Graph.OptIn(value = Graph.OptIn.SUITE_STRUCTURE_STANDARD)
-))
-@Graph.OptOuts(value = Array(
-  /* Process */
-  /* branch: passed all. */
-  /* filter */
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.DropTest$Traversals", method = "g_V_properties_drop", reason = "please find bug on this case."),
-
-  /* map */
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CountTest$Traversals", method = "g_V_both_both_count", reason = "count differ very little. fix this."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CountTest$Traversals", method = "g_V_repeatXoutX_timesX3X_count", reason = "count differ very little. fix this."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CountTest$Traversals", method = "g_V_repeatXoutX_timesX8X_count", reason = "count differ very litter. fix this."),
-
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "testProfileStrategyCallback", reason = "NullPointerException. fix this."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "g_V_whereXinXcreatedX_count_isX1XX_name_profile", reason = "java.lang.AssertionError: There should be 3 top-level metrics. fix this."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "g_V_whereXinXcreatedX_count_isX1XX_name_profileXmetricsX", reason = "expected 2, actual 6. fix this."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "grateful_V_out_out_profileXmetricsX", reason = "expected 8049, actual 8046. fix this."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "grateful_V_out_out_profile", reason = "expected 8049, actual 8046. fix this."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "modern_V_out_out_profileXmetricsX", reason = "expected 2, actual 6. fix this."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "modern_V_out_out_profile", reason = "expected 2, actual 6. fix this."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "testProfileStrategyCallbackSideEffect", reason = "NullPointerException. fix this."),
-
-  /* sideEffect */
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphTest$Traversals", method = "g_V_withSideEffectXsgX_repeatXbothEXcreatedX_subgraphXsgX_outVX_timesX5X_name_dedup", reason = "Expected 5, Actual 6."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphTest$Traversals", method = "g_V_withSideEffectXsgX_outEXknowsX_subgraphXsgX_name_capXsgX", reason = "Expected 3, Actual 6"),
-
-  /* compliance */
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.CoreTraversalTest", method = "shouldThrowExceptionWhenIdsMixed", reason = "VertexId is not Element."),
-
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionTest", method = "*", reason = "not supported yet."),
-
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnAddVWithSpecifiedId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnAddVWithGeneratedCustomId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnGraphAddVWithGeneratedDefaultId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnAddVWithGeneratedDefaultId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnGraphAddVWithGeneratedCustomId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnGraphAddVWithSpecifiedId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
-
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategyProcessTest", method = "*", reason = "not supported yet."),
-
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.PartitionStrategyProcessTest", method = "*", reason = "not supported yet."),
-
-  /* Structure */
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.EdgeTest$BasicEdgeTest", method = "shouldValidateIdEquality", reason = "reference equals on EdgeId is not supported."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.EdgeTest$BasicEdgeTest", method = "shouldValidateEquality", reason = "reference equals on EdgeId is not supported."),
-
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.VertexTest$BasicVertexTest", method = "shouldHaveExceptionConsistencyWhenAssigningSameIdOnEdge", reason = "S2Vertex.addEdge behave as upsert."),
-
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdgeTest", method = "shouldNotEvaluateToEqualDifferentId", reason = "reference equals is not supported."),
-
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.util.detached.DetachedPropertyTest", method = "shouldNotBeEqualPropertiesAsThereIsDifferentKey", reason = "reference equals is not supported."),
-
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.GraphTest", method = "shouldRemoveVertices", reason = "random label creation is not supported. all label need to be pre-configured."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.GraphTest", method = "shouldHaveExceptionConsistencyWhenAssigningSameIdOnVertex", reason = "Assigning the same ID to an Element update instead of throwing exception."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.GraphTest", method = "shouldRemoveEdges", reason = "random label creation is not supported. all label need to be pre-configured."),
-
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceEdgeTest", method = "shouldNotEvaluateToEqualDifferentId", reason = "Assigning the same ID to an Element update instead of throwing exception."),
-
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.1)", reason = "graphson-v2-embedded is not supported."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.5)", reason = "graphson-v2-embedded is not supported."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.5)", reason = "graphson-v2-embedded is not supported."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.1)", reason = "graphson-v2-embedded is not supported."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(PowerLawDistribution{gamma=2.3, multiplier=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.25)", reason = "graphson-v2-embedded is not supported."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(PowerLawDistribution{gamma=2.3, multiplier=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.25)", reason = "graphson-v2-embedded is not supported."),
-
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.DistributionGeneratorTest", method = "*", reason = "non-deterministic test."),
-
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.SerializationTest$GryoTest", method = "shouldSerializeTree", reason = "order of children is reversed. not sure why."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.SerializationTest$GraphSONTest", method = "shouldSerializeTraversalMetrics", reason = "expected 2, actual 3."),
-
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexWithBOTHEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexWithINEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteDetachedVertexAsReferenceNoEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexNoEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexWithOUTEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteDetachedVertexNoEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
-
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method = "shouldReadWriteDetachedEdgeAsReference", specific = "graphson-v2-embedded", reason = "no"),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method = "shouldReadWriteEdge", specific = "graphson-v2-embedded", reason = "no"),
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method = "shouldReadWriteDetachedEdge", specific = "graphson-v2-embedded", reason = "no"),
-
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoGraphTest", method = "*", reason = "no"), // all failed.
-
-  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoTest", method = "*", reason = "no")
-))
 class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2GraphLike {
 
   var apacheConfiguration: Configuration = _
@@ -457,10 +378,10 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
       } else {
         val filterOutFuture = q.queryOption.filterOutQuery match {
           case None => fallback
-          case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery)
+          case Some(filterOutQuery) => traversalHelper.getEdgesStepInner(filterOutQuery)
         }
         for {
-          stepResult <- getEdgesStepInner(q)
+          stepResult <- traversalHelper.getEdgesStepInner(q)
           filterOutInnerResult <- filterOutFuture
         } yield {
           if (filterOutInnerResult.isEmpty) stepResult
@@ -480,7 +401,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
       else {
         val filterOutFuture = mq.queryOption.filterOutQuery match {
           case None => fallback
-          case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery)
+          case Some(filterOutQuery) => traversalHelper.getEdgesStepInner(filterOutQuery)
         }
 
         val multiQueryFutures = Future.sequence(mq.queries.map { query => getEdges(query) })
@@ -515,7 +436,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
     }
 
     val retryFuture = Extensions.retryOnSuccess(DeleteAllFetchCount) {
-      fetchAndDeleteAll(queries, requestTs)
+      traversalHelper.fetchAndDeleteAll(queries, requestTs)
     } { case (allDeleted, deleteSuccess) =>
       allDeleted && deleteSuccess
     }.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess }
@@ -546,12 +467,12 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
   }
 
   override def getVertex(vertexId: VertexId): Option[S2VertexLike] = {
-    val v = newVertex(vertexId)
+    val v = elementBuilder.newVertex(vertexId)
     Await.result(getVertices(Seq(v)).map { vertices => vertices.headOption }, WaitTimeout)
   }
 
   override def fetchEdges(vertex: S2VertexLike, labelNameWithDirs: Seq[(String, String)]): util.Iterator[Edge] = {
-    Await.result(fetchEdgesAsync(vertex, labelNameWithDirs), WaitTimeout)
+    Await.result(traversalHelper.fetchEdgesAsync(vertex, labelNameWithDirs), WaitTimeout)
   }
 
   override def edgesAsync(vertex: S2VertexLike, direction: Direction, labelNames: String*): Future[util.Iterator[Edge]] = {
@@ -574,149 +495,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
         }
       }
 
-    fetchEdgesAsync(vertex, labelNameWithDirs.distinct)
+    traversalHelper.fetchEdgesAsync(vertex, labelNameWithDirs.distinct)
   }
 
   def isRunning(): Boolean = running.get()
-
-  /** Private **/
-
-  private def getEdgesStepInner(q: Query, buildLastStepInnerResult: Boolean = false): Future[StepResult] = {
-    Try {
-      if (q.steps.isEmpty) fallback
-      else {
-        def fetch: Future[StepResult] = {
-          val startStepInnerResult = QueryResult.fromVertices(this, q)
-          q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) =>
-            for {
-              prevStepInnerResult <- prevStepInnerResultFuture
-              currentStepInnerResult <- fetchStep(q, stepIdx, prevStepInnerResult, buildLastStepInnerResult)
-            } yield {
-              currentStepInnerResult.copy(
-                accumulatedCursors = prevStepInnerResult.accumulatedCursors :+ currentStepInnerResult.cursors,
-                failCount = currentStepInnerResult.failCount + prevStepInnerResult.failCount
-              )
-            }
-          }
-        }
-
-        fetch
-      }
-    } recover {
-      case e: Exception =>
-        logger.error(s"getEdgesAsync: $e", e)
-        fallback
-    } get
-  }
-
-  private def fetchStep(orgQuery: Query,
-                        stepIdx: Int,
-                        stepInnerResult: StepResult,
-                        buildLastStepInnerResult: Boolean = false): Future[StepResult] = {
-    if (stepInnerResult.isEmpty) Future.successful(StepResult.Empty)
-    else {
-      val (_, prevStepTgtVertexIdEdges: Map[VertexId, ArrayBuffer[EdgeWithScore]], queryRequests: Seq[QueryRequest]) =
-        traversalHelper.buildNextStepQueryRequests(orgQuery, stepIdx, stepInnerResult)
-
-      val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges)
-
-      traversalHelper.filterEdges(orgQuery, stepIdx, queryRequests,
-        fetchedLs, orgQuery.steps(stepIdx).queryParams, buildLastStepInnerResult, prevStepTgtVertexIdEdges)(ec)
-    }
-  }
-
-  private def fetches(queryRequests: Seq[QueryRequest],
-                      prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepResult]] = {
-
-    val reqWithIdxs = queryRequests.zipWithIndex
-    val requestsPerLabel = reqWithIdxs.groupBy(t => t._1.queryParam.label)
-    val aggFuture = requestsPerLabel.foldLeft(Future.successful(Map.empty[Int, StepResult])) { case (prevFuture, (label, reqWithIdxs)) =>
-      for {
-        prev <- prevFuture
-        cur <- getStorage(label).fetches(reqWithIdxs.map(_._1), prevStepEdges)
-      } yield {
-        prev ++ reqWithIdxs.map(_._2).zip(cur).toMap
-      }
-    }
-    aggFuture.map { agg => agg.toSeq.sortBy(_._1).map(_._2) }
-  }
-
-  private def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = {
-    val futures = queries.map(getEdgesStepInner(_, true))
-    val future = for {
-      stepInnerResultLs <- Future.sequence(futures)
-      (allDeleted, ret) <- deleteAllFetchedEdgesLs(stepInnerResultLs, requestTs)
-    } yield {
-      //        logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}")
-      (allDeleted, ret)
-    }
-
-    Extensions.retryOnFailure(MaxRetryNum) {
-      future
-    } {
-      logger.error(s"fetch and deleteAll failed.")
-      (true, false)
-    }
-
-  }
-
-  private def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepResult],
-                                      requestTs: Long): Future[(Boolean, Boolean)] = {
-    stepInnerResultLs.foreach { stepInnerResult =>
-      if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.")
-    }
-    val futures = for {
-      stepInnerResult <- stepInnerResultLs
-      filtered = stepInnerResult.edgeWithScores.filter { edgeWithScore =>
-        (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree
-      }
-      edgesToDelete = elementBuilder.buildEdgesToDelete(filtered, requestTs)
-      if edgesToDelete.nonEmpty
-    } yield {
-      val head = edgesToDelete.head
-      val label = head.edge.innerLabel
-      val stepResult = StepResult(edgesToDelete, Nil, Nil, false)
-      val ret = label.schemaVersion match {
-        case HBaseType.VERSION3 | HBaseType.VERSION4 =>
-          if (label.consistencyLevel == "strong") {
-            /*
-              * read: snapshotEdge on queryResult = O(N)
-              * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge))
-              */
-            mutateEdges(edgesToDelete.map(_.edge), withWait = true).map(_.forall(_.isSuccess))
-          } else {
-            getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
-          }
-        case _ =>
-
-          /*
-            * read: x
-            * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices)
-            */
-          getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
-      }
-      ret
-    }
-
-    if (futures.isEmpty) {
-      // all deleted.
-      Future.successful(true -> true)
-    } else {
-      Future.sequence(futures).map { rets => false -> rets.forall(identity) }
-    }
-  }
-
-  private def fetchEdgesAsync(vertex: S2VertexLike, labelNameWithDirs: Seq[(String, String)]): Future[util.Iterator[Edge]] = {
-    val queryParams = labelNameWithDirs.map { case (l, direction) =>
-      QueryParam(labelName = l, direction = direction.toLowerCase)
-    }
-
-    val query = Query.toQuery(Seq(vertex), queryParams)
-    val queryRequests = queryParams.map { param => QueryRequest(query, 0, vertex, param) }
-    val ls = new util.ArrayList[Edge]()
-    fetches(queryRequests, Map.empty).map { stepResultLs =>
-      stepResultLs.foreach(_.edgeWithScores.foreach(es => ls.add(es.edge)))
-      ls.iterator()
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
index a58f1e0..1b80cb4 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
@@ -58,8 +58,6 @@ trait S2GraphLike extends Graph {
 
   override def features() = s2Features
 
-  def nextLocalLongId = localLongId.getAndIncrement()
-
   def fallback = Future.successful(StepResult.Empty)
 
   def defaultStorage: Storage
@@ -103,34 +101,6 @@ trait S2GraphLike extends Graph {
   def edgesAsync(vertex: S2VertexLike, direction: Direction, labelNames: String*): Future[util.Iterator[Edge]]
 
   /** Convert to Graph Element **/
-  def newEdge(srcVertex: S2VertexLike,
-              tgtVertex: S2VertexLike,
-              innerLabel: Label,
-              dir: Int,
-              op: Byte = GraphUtil.defaultOpByte,
-              version: Long = System.currentTimeMillis(),
-              propsWithTs: S2Edge.State,
-              parentEdges: Seq[EdgeWithScore] = Nil,
-              originalEdgeOpt: Option[S2EdgeLike] = None,
-              pendingEdgeOpt: Option[S2EdgeLike] = None,
-              statusCode: Byte = 0,
-              lockTs: Option[Long] = None,
-              tsInnerValOpt: Option[InnerValLike] = None): S2EdgeLike =
-    elementBuilder.newEdge(srcVertex, tgtVertex, innerLabel, dir, op, version, propsWithTs,
-      parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
-
-  def newVertexId(service: Service,
-                  column: ServiceColumn,
-                  id: Any): VertexId =
-    elementBuilder.newVertexId(service, column, id)
-
-  def newVertex(id: VertexId,
-                ts: Long = System.currentTimeMillis(),
-                props: S2Vertex.Props = S2Vertex.EmptyProps,
-                op: Byte = 0,
-                belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike =
-    elementBuilder.newVertex(id, ts, props, op, belongLabelIds)
-
   def toVertex(serviceName: String,
                columnName: String,
                id: Any,
@@ -261,7 +231,7 @@ trait S2GraphLike extends Graph {
 
     val vertex = kvsMap.get(T.id.name()) match {
       case None => // do nothing
-        val id = nextLocalLongId
+        val id = localLongId.getAndIncrement()
         makeVertex(Long.box(id), kvsMap)
       case Some(idValue) if S2Property.validType(idValue) =>
         makeVertex(idValue, kvsMap)
@@ -269,7 +239,7 @@ trait S2GraphLike extends Graph {
         throw Vertex.Exceptions.userSuppliedIdsOfThisTypeNotSupported
     }
 
-    addVertexInner(vertex)
+    addVertex(vertex.id, vertex.ts, vertex.props, vertex.op, vertex.belongLabelIds)
 
     vertex
   }
@@ -290,17 +260,6 @@ trait S2GraphLike extends Graph {
     vertex
   }
 
-  def addVertexInner(vertex: S2VertexLike): S2VertexLike = {
-    val future = mutateVertices(Seq(vertex), withWait = true).flatMap { rets =>
-      if (rets.forall(_.isSuccess)) {
-        indexProvider.mutateVerticesAsync(Seq(vertex))
-      } else throw new RuntimeException("addVertex failed.")
-    }
-    Await.ready(future, WaitTimeout)
-
-    vertex
-  }
-
   /* tp3 only */
   def addEdge(srcVertex: S2VertexLike, labelName: String, tgtVertex: Vertex, kvs: AnyRef*): Edge = {
     val containsId = kvs.contains(T.id)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/S2GraphTp.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphTp.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphTp.scala
new file mode 100644
index 0000000..5ce4086
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphTp.scala
@@ -0,0 +1,98 @@
+package org.apache.s2graph.core
+
+import com.typesafe.config.Config
+import org.apache.tinkerpop.gremlin.structure.Graph
+
+import scala.concurrent.ExecutionContext
+
+
+@Graph.OptIns(value = Array(
+  new Graph.OptIn(value = Graph.OptIn.SUITE_PROCESS_STANDARD),
+  new Graph.OptIn(value = Graph.OptIn.SUITE_STRUCTURE_STANDARD)
+))
+@Graph.OptOuts(value = Array(
+  /* Process */
+  /* branch: passed all. */
+  /* filter */
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.DropTest$Traversals", method = "g_V_properties_drop", reason = "please find bug on this case."),
+
+  /* map */
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CountTest$Traversals", method = "g_V_both_both_count", reason = "count differ very little. fix this."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CountTest$Traversals", method = "g_V_repeatXoutX_timesX3X_count", reason = "count differ very little. fix this."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CountTest$Traversals", method = "g_V_repeatXoutX_timesX8X_count", reason = "count differ very litter. fix this."),
+
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "testProfileStrategyCallback", reason = "NullPointerException. fix this."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "g_V_whereXinXcreatedX_count_isX1XX_name_profile", reason = "java.lang.AssertionError: There should be 3 top-level metrics. fix this."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "g_V_whereXinXcreatedX_count_isX1XX_name_profileXmetricsX", reason = "expected 2, actual 6. fix this."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "grateful_V_out_out_profileXmetricsX", reason = "expected 8049, actual 8046. fix this."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "grateful_V_out_out_profile", reason = "expected 8049, actual 8046. fix this."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "modern_V_out_out_profileXmetricsX", reason = "expected 2, actual 6. fix this."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "modern_V_out_out_profile", reason = "expected 2, actual 6. fix this."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "testProfileStrategyCallbackSideEffect", reason = "NullPointerException. fix this."),
+
+  /* sideEffect */
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphTest$Traversals", method = "g_V_withSideEffectXsgX_repeatXbothEXcreatedX_subgraphXsgX_outVX_timesX5X_name_dedup", reason = "Expected 5, Actual 6."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphTest$Traversals", method = "g_V_withSideEffectXsgX_outEXknowsX_subgraphXsgX_name_capXsgX", reason = "Expected 3, Actual 6"),
+
+  /* compliance */
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.CoreTraversalTest", method = "shouldThrowExceptionWhenIdsMixed", reason = "VertexId is not Element."),
+
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionTest", method = "*", reason = "not supported yet."),
+
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnAddVWithSpecifiedId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnAddVWithGeneratedCustomId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnGraphAddVWithGeneratedDefaultId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnAddVWithGeneratedDefaultId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnGraphAddVWithGeneratedCustomId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnGraphAddVWithSpecifiedId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
+
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategyProcessTest", method = "*", reason = "not supported yet."),
+
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.PartitionStrategyProcessTest", method = "*", reason = "not supported yet."),
+
+  /* Structure */
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.EdgeTest$BasicEdgeTest", method = "shouldValidateIdEquality", reason = "reference equals on EdgeId is not supported."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.EdgeTest$BasicEdgeTest", method = "shouldValidateEquality", reason = "reference equals on EdgeId is not supported."),
+
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.VertexTest$BasicVertexTest", method = "shouldHaveExceptionConsistencyWhenAssigningSameIdOnEdge", reason = "S2Vertex.addEdge behave as upsert."),
+
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdgeTest", method = "shouldNotEvaluateToEqualDifferentId", reason = "reference equals is not supported."),
+
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.util.detached.DetachedPropertyTest", method = "shouldNotBeEqualPropertiesAsThereIsDifferentKey", reason = "reference equals is not supported."),
+
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.GraphTest", method = "shouldRemoveVertices", reason = "random label creation is not supported. all label need to be pre-configured."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.GraphTest", method = "shouldHaveExceptionConsistencyWhenAssigningSameIdOnVertex", reason = "Assigning the same ID to an Element update instead of throwing exception."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.GraphTest", method = "shouldRemoveEdges", reason = "random label creation is not supported. all label need to be pre-configured."),
+
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceEdgeTest", method = "shouldNotEvaluateToEqualDifferentId", reason = "Assigning the same ID to an Element update instead of throwing exception."),
+
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.1)", reason = "graphson-v2-embedded is not supported."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.5)", reason = "graphson-v2-embedded is not supported."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.5)", reason = "graphson-v2-embedded is not supported."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.1)", reason = "graphson-v2-embedded is not supported."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(PowerLawDistribution{gamma=2.3, multiplier=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.25)", reason = "graphson-v2-embedded is not supported."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(PowerLawDistribution{gamma=2.3, multiplier=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.25)", reason = "graphson-v2-embedded is not supported."),
+
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.DistributionGeneratorTest", method = "*", reason = "non-deterministic test."),
+
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.SerializationTest$GryoTest", method = "shouldSerializeTree", reason = "order of children is reversed. not sure why."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.SerializationTest$GraphSONTest", method = "shouldSerializeTraversalMetrics", reason = "expected 2, actual 3."),
+
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexWithBOTHEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexWithINEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteDetachedVertexAsReferenceNoEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexNoEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexWithOUTEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteDetachedVertexNoEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
+
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method = "shouldReadWriteDetachedEdgeAsReference", specific = "graphson-v2-embedded", reason = "no"),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method = "shouldReadWriteEdge", specific = "graphson-v2-embedded", reason = "no"),
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method = "shouldReadWriteDetachedEdge", specific = "graphson-v2-embedded", reason = "no"),
+
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoGraphTest", method = "*", reason = "no"), // all failed.
+
+  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoTest", method = "*", reason = "no")
+))
+class S2GraphTp(config: Config)(override implicit val ec: ExecutionContext) extends S2Graph(config) {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
index ad35efd..97e3095 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
@@ -118,7 +118,8 @@ trait S2VertexLike extends Vertex with GraphElement {
         props.put(key, newProps)
 
         // FIXME: save to persistent for tp test
-        graph.addVertexInner(this)
+//        graph.addVertexInner(this)
+        graph.addVertex(id, ts, props, op, belongLabelIds)
         newProps
       case _ => throw new RuntimeException("only single cardinality is supported.")
     }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
index 25b909e..7a8e63e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
@@ -1,14 +1,17 @@
 package org.apache.s2graph.core
 
+import java.util
+
 import org.apache.s2graph.core.S2Graph.{FilterHashKey, HashKey}
 import org.apache.s2graph.core.mysqls.LabelMeta
-import org.apache.s2graph.core.types.{InnerVal, LabelWithDirection, VertexId}
-import org.apache.s2graph.core.utils.logger
+import org.apache.s2graph.core.types.{HBaseType, InnerVal, LabelWithDirection, VertexId}
+import org.apache.s2graph.core.utils.{Extensions, logger}
+import org.apache.tinkerpop.gremlin.structure.Edge
 
 import scala.annotation.tailrec
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.concurrent.Future
-import scala.util.Random
+import scala.util.{Random, Try}
 
 object TraversalHelper {
   @tailrec
@@ -125,6 +128,150 @@ object TraversalHelper {
 class TraversalHelper(graph: S2GraphLike) {
   import TraversalHelper._
 
+  implicit val ec = graph.ec
+  val MaxRetryNum = graph.config.getInt("max.retry.number")
+
+  def fallback = Future.successful(StepResult.Empty)
+
+  def getEdgesStepInner(q: Query, buildLastStepInnerResult: Boolean = false): Future[StepResult] = {
+    Try {
+      if (q.steps.isEmpty) fallback
+      else {
+        def fetch: Future[StepResult] = {
+          val startStepInnerResult = QueryResult.fromVertices(graph, q)
+          q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) =>
+            for {
+              prevStepInnerResult <- prevStepInnerResultFuture
+              currentStepInnerResult <- fetchStep(q, stepIdx, prevStepInnerResult, buildLastStepInnerResult)
+            } yield {
+              currentStepInnerResult.copy(
+                accumulatedCursors = prevStepInnerResult.accumulatedCursors :+ currentStepInnerResult.cursors,
+                failCount = currentStepInnerResult.failCount + prevStepInnerResult.failCount
+              )
+            }
+          }
+        }
+
+        fetch
+      }
+    } recover {
+      case e: Exception =>
+        logger.error(s"getEdgesAsync: $e", e)
+        fallback
+    } get
+  }
+
+  def fetchStep(orgQuery: Query,
+                stepIdx: Int,
+                stepInnerResult: StepResult,
+                buildLastStepInnerResult: Boolean = false): Future[StepResult] = {
+    if (stepInnerResult.isEmpty) Future.successful(StepResult.Empty)
+    else {
+      val (_, prevStepTgtVertexIdEdges: Map[VertexId, ArrayBuffer[EdgeWithScore]], queryRequests: Seq[QueryRequest]) =
+        graph.traversalHelper.buildNextStepQueryRequests(orgQuery, stepIdx, stepInnerResult)
+
+      val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges)
+
+      graph.traversalHelper.filterEdges(orgQuery, stepIdx, queryRequests,
+        fetchedLs, orgQuery.steps(stepIdx).queryParams, buildLastStepInnerResult, prevStepTgtVertexIdEdges)(graph.ec)
+    }
+  }
+
+  def fetches(queryRequests: Seq[QueryRequest],
+              prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepResult]] = {
+
+    val reqWithIdxs = queryRequests.zipWithIndex
+    val requestsPerLabel = reqWithIdxs.groupBy(t => t._1.queryParam.label)
+    val aggFuture = requestsPerLabel.foldLeft(Future.successful(Map.empty[Int, StepResult])) { case (prevFuture, (label, reqWithIdxs)) =>
+      for {
+        prev <- prevFuture
+        cur <- graph.getStorage(label).fetches(reqWithIdxs.map(_._1), prevStepEdges)
+      } yield {
+        prev ++ reqWithIdxs.map(_._2).zip(cur).toMap
+      }
+    }
+    aggFuture.map { agg => agg.toSeq.sortBy(_._1).map(_._2) }
+  }
+
+  def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = {
+    val futures = queries.map(getEdgesStepInner(_, true))
+    val future = for {
+      stepInnerResultLs <- Future.sequence(futures)
+      (allDeleted, ret) <- deleteAllFetchedEdgesLs(stepInnerResultLs, requestTs)
+    } yield {
+      //        logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}")
+      (allDeleted, ret)
+    }
+
+    Extensions.retryOnFailure(MaxRetryNum) {
+      future
+    } {
+      logger.error(s"fetch and deleteAll failed.")
+      (true, false)
+    }
+
+  }
+
+  def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepResult],
+                              requestTs: Long): Future[(Boolean, Boolean)] = {
+    stepInnerResultLs.foreach { stepInnerResult =>
+      if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.")
+    }
+    val futures = for {
+      stepInnerResult <- stepInnerResultLs
+      filtered = stepInnerResult.edgeWithScores.filter { edgeWithScore =>
+        (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree
+      }
+      edgesToDelete = graph.elementBuilder.buildEdgesToDelete(filtered, requestTs)
+      if edgesToDelete.nonEmpty
+    } yield {
+      val head = edgesToDelete.head
+      val label = head.edge.innerLabel
+      val stepResult = StepResult(edgesToDelete, Nil, Nil, false)
+      val ret = label.schemaVersion match {
+        case HBaseType.VERSION3 | HBaseType.VERSION4 =>
+          if (label.consistencyLevel == "strong") {
+            /*
+              * read: snapshotEdge on queryResult = O(N)
+              * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge))
+              */
+            graph.mutateEdges(edgesToDelete.map(_.edge), withWait = true).map(_.forall(_.isSuccess))
+          } else {
+            graph.getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
+          }
+        case _ =>
+
+          /*
+            * read: x
+            * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices)
+            */
+          graph.getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
+      }
+      ret
+    }
+
+    if (futures.isEmpty) {
+      // all deleted.
+      Future.successful(true -> true)
+    } else {
+      Future.sequence(futures).map { rets => false -> rets.forall(identity) }
+    }
+  }
+
+  def fetchEdgesAsync(vertex: S2VertexLike, labelNameWithDirs: Seq[(String, String)]): Future[util.Iterator[Edge]] = {
+    val queryParams = labelNameWithDirs.map { case (l, direction) =>
+      QueryParam(labelName = l, direction = direction.toLowerCase)
+    }
+
+    val query = Query.toQuery(Seq(vertex), queryParams)
+    val queryRequests = queryParams.map { param => QueryRequest(query, 0, vertex, param) }
+    val ls = new util.ArrayList[Edge]()
+    fetches(queryRequests, Map.empty).map { stepResultLs =>
+      stepResultLs.foreach(_.edgeWithScores.foreach(es => ls.add(es.edge)))
+      ls.iterator()
+    }
+  }
+
   def buildNextStepQueryRequests(orgQuery: Query, stepIdx: Int, stepInnerResult: StepResult) = {
     val edgeWithScoreLs = stepInnerResult.edgeWithScores
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index 54b865a..55a3638 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -269,7 +269,7 @@ class RequestParser(graph: S2GraphLike) {
       id <- ids
       innerId <- jsValueToInnerVal(id, serviceColumn.columnType, label.schemaVersion)
     } yield {
-      graph.newVertex(SourceVertexId(serviceColumn, innerId), System.currentTimeMillis())
+      graph.elementBuilder.newVertex(SourceVertexId(serviceColumn, innerId), System.currentTimeMillis())
     }
 
     vertices

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
index 6ebc90c..945f246 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -36,6 +36,7 @@ class IndexEdgeDeserializable(graph: S2GraphLike,
 
    type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int)
    type ValueRaw = (Array[(LabelMeta, InnerValLike)], Int)
+   val builder = graph.elementBuilder
 
    override def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
                                                cacheElementOpt: Option[S2EdgeLike]): Option[S2EdgeLike] = {
@@ -61,7 +62,7 @@ class IndexEdgeDeserializable(graph: S2GraphLike,
        else {
          val label = Label.findById(labelWithDir.labelId)
          val schemaVer = label.schemaVersion
-         val srcVertex = graph.newVertex(srcVertexId, version)
+         val srcVertex = builder.newVertex(srcVertexId, version)
          var tsVal = version
          val isTallSchema = tallSchemaVersions(label.schemaVersion)
          val isDegree = if (isTallSchema) pos == kv.row.length else kv.qualifier.isEmpty
@@ -71,13 +72,13 @@ class IndexEdgeDeserializable(graph: S2GraphLike,
            //      val degreeVal = Bytes.toLong(kv.value)
            val degreeVal = bytesToLongFunc(kv.value, 0)
            val tgtVertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("0", schemaVer))
-           val tgtVertex = graph.newVertex(tgtVertexId, version)
-           val edge = graph.newEdge(srcVertex, tgtVertex,
+           val tgtVertex = builder.newVertex(tgtVertexId, version)
+           val edge = graph.elementBuilder.newEdge(srcVertex, tgtVertex,
              label, labelWithDir.dir, GraphUtil.defaultOpByte, version, S2Edge.EmptyState)
 
            edge.propertyInner(LabelMeta.timestamp.name, version, version)
            edge.propertyInner(LabelMeta.degree.name, degreeVal, version)
-           edge.tgtVertex = graph.newVertex(tgtVertexId, version)
+           edge.tgtVertex = builder.newVertex(tgtVertexId, version)
            edge.setOp(GraphUtil.defaultOpByte)
            edge.setTsInnerValOpt(Option(InnerVal.withLong(tsVal, schemaVer)))
 
@@ -113,8 +114,8 @@ class IndexEdgeDeserializable(graph: S2GraphLike,
                else kv.qualifier(kv.qualifier.length - 1)
              }
 
-           val tgtVertex = graph.newVertex(tgtVertexIdRaw, version)
-           val edge = graph.newEdge(srcVertex, tgtVertex,
+           val tgtVertex = builder.newVertex(tgtVertexIdRaw, version)
+           val edge = graph.elementBuilder.newEdge(srcVertex, tgtVertex,
              label, labelWithDir.dir, GraphUtil.defaultOpByte, version, S2Edge.EmptyState)
 
            val index = label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}"))
@@ -150,7 +151,7 @@ class IndexEdgeDeserializable(graph: S2GraphLike,
 
            if (edge.checkProperty(LabelMeta.to.name)) {
              val vId = edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs
-             val tgtVertex = graph.newVertex(TargetVertexId(ServiceColumn.Default, vId.innerVal), version)
+             val tgtVertex = builder.newVertex(TargetVertexId(ServiceColumn.Default, vId.innerVal), version)
              edge.setTgtVertex(tgtVertex)
            }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
index 79e3f2e..e533b4b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -32,6 +32,7 @@ class IndexEdgeDeserializable(graph: S2GraphLike,
 
    type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int)
    type ValueRaw = (Array[(LabelMeta, InnerValLike)], Int)
+   val builder = graph.elementBuilder
 
    override def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
                                                cacheElementOpt: Option[S2EdgeLike]): Option[S2EdgeLike] = {
@@ -55,20 +56,20 @@ class IndexEdgeDeserializable(graph: S2GraphLike,
        else {
          val label = Label.findById(labelWithDir.labelId)
          val schemaVer = label.schemaVersion
-         val srcVertex = graph.newVertex(srcVertexId, version)
+         val srcVertex = builder.newVertex(srcVertexId, version)
          //TODO:
          var tsVal = version
 
          if (kv.qualifier.isEmpty) {
            val degreeVal = bytesToLongFunc(kv.value, 0)
            val tgtVertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("0", schemaVer))
-           val tgtVertex = graph.newVertex(tgtVertexId, version)
-           val edge = graph.newEdge(srcVertex, tgtVertex,
+           val tgtVertex = builder.newVertex(tgtVertexId, version)
+           val edge = builder.newEdge(srcVertex, tgtVertex,
              label, labelWithDir.dir, GraphUtil.defaultOpByte, version, S2Edge.EmptyState)
 
            edge.propertyInner(LabelMeta.timestamp.name, version, version)
            edge.propertyInner(LabelMeta.degree.name, degreeVal, version)
-           edge.tgtVertex = graph.newVertex(tgtVertexId, version)
+           edge.tgtVertex = builder.newVertex(tgtVertexId, version)
            edge.setOp(GraphUtil.defaultOpByte)
            edge.setTsInnerValOpt(Option(InnerVal.withLong(tsVal, schemaVer)))
 
@@ -88,8 +89,8 @@ class IndexEdgeDeserializable(graph: S2GraphLike,
              if (kv.qualifier.length == pos) GraphUtil.defaultOpByte
              else kv.qualifier(kv.qualifier.length-1)
 
-           val tgtVertex = graph.newVertex(tgtVertexIdRaw, version)
-           val edge = graph.newEdge(srcVertex, tgtVertex,
+           val tgtVertex = builder.newVertex(tgtVertexIdRaw, version)
+           val edge = builder.newEdge(srcVertex, tgtVertex,
              label, labelWithDir.dir, GraphUtil.defaultOpByte, version, S2Edge.EmptyState)
 
            val index = label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}"))
@@ -124,7 +125,7 @@ class IndexEdgeDeserializable(graph: S2GraphLike,
            /* process tgtVertexId */
            if (edge.checkProperty(LabelMeta.to.name)) {
              val vId = edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs
-             val tgtVertex = graph.newVertex(TargetVertexId(ServiceColumn.Default, vId.innerVal), version)
+             val tgtVertex = builder.newVertex(TargetVertexId(ServiceColumn.Default, vId.innerVal), version)
              edge.setTgtVertex(tgtVertex)
            }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
index 4c97f6e..b7f5ba1 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
@@ -28,7 +28,7 @@ import org.apache.s2graph.core._
 import org.apache.s2graph.core.storage.serde.Deserializable
 
 class SnapshotEdgeDeserializable(graph: S2GraphLike) extends Deserializable[SnapshotEdge] {
-
+  val builder = graph.elementBuilder
   def statusCodeWithOp(byte: Byte): (Byte, Byte) = {
     val statusCode = byte >> 4
     val op = byte & ((1 << 4) - 1)
@@ -89,15 +89,19 @@ class SnapshotEdgeDeserializable(graph: S2GraphLike) extends Deserializable[Snap
             val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
 
             val pendingEdge =
-              graph.newEdge(graph.newVertex(srcVertexId, version),
-                graph.newVertex(tgtVertexId, version),
+              builder.newEdge(
+                builder.newVertex(srcVertexId, version),
+                builder.newVertex(tgtVertexId, version),
                 label, labelWithDir.dir, pendingEdgeOp,
                 version, pendingEdgeProps.toMap,
                 statusCode = pendingEdgeStatusCode, lockTs = lockTs, tsInnerValOpt = Option(tsInnerVal))
+
             Option(pendingEdge)
           }
 
-        val snapshotEdge = graph.elementBuilder.newSnapshotEdge(graph.newVertex(srcVertexId, ts), graph.newVertex(tgtVertexId, ts),
+        val snapshotEdge = builder.newSnapshotEdge(
+          builder.newVertex(srcVertexId, ts),
+          builder.newVertex(tgtVertexId, ts),
           label, labelWithDir.dir, op, version, props.toMap, statusCode = statusCode,
           pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal))
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
index a6a64a2..7dec6d9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
@@ -28,7 +28,7 @@ import org.apache.s2graph.core._
 import org.apache.s2graph.core.storage.serde.Deserializable
 
 class SnapshotEdgeDeserializable(graph: S2GraphLike) extends Deserializable[SnapshotEdge] {
-
+  val builder = graph.elementBuilder
   def statusCodeWithOp(byte: Byte): (Byte, Byte) = {
     val statusCode = byte >> 4
     val op = byte & ((1 << 4) - 1)
@@ -55,7 +55,7 @@ class SnapshotEdgeDeserializable(graph: S2GraphLike) extends Deserializable[Snap
       else {
         val label = Label.findById(labelWithDir.labelId)
         val schemaVer = label.schemaVersion
-        val srcVertex = graph.newVertex(srcVertexId, version)
+        val srcVertex = builder.newVertex(srcVertexId, version)
 
         val (tgtVertexId, _) = TargetVertexId.fromBytes(kv.qualifier, 0, kv.qualifier.length, schemaVer)
 
@@ -80,15 +80,18 @@ class SnapshotEdgeDeserializable(graph: S2GraphLike) extends Deserializable[Snap
             val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
 
             val pendingEdge =
-              graph.newEdge(graph.newVertex(srcVertexId, version),
-                graph.newVertex(tgtVertexId, version),
+              builder.newEdge(
+                builder.newVertex(srcVertexId, version),
+                builder.newVertex(tgtVertexId, version),
                 label, labelWithDir.dir, pendingEdgeOp,
                 version, pendingEdgeProps.toMap,
                 statusCode = pendingEdgeStatusCode, lockTs = lockTs, tsInnerValOpt = Option(tsInnerVal))
             Option(pendingEdge)
           }
 
-        val snapshotEdge = graph.elementBuilder.newSnapshotEdge(graph.newVertex(srcVertexId, ts), graph.newVertex(tgtVertexId, ts),
+        val snapshotEdge = builder.newSnapshotEdge(
+          builder.newVertex(srcVertexId, ts),
+          builder.newVertex(tgtVertexId, ts),
           label, labelWithDir.dir, op, version, props.toMap, statusCode = statusCode,
           pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal))
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala
index 70160a8..5576017 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala
@@ -28,6 +28,7 @@ import org.apache.s2graph.core.{S2Graph, S2GraphLike, S2Vertex, S2VertexLike}
 
 class VertexDeserializable(graph: S2GraphLike,
                            bytesToInt: (Array[Byte], Int) => Int = bytesToInt) extends Deserializable[S2VertexLike] {
+  val builder = graph.elementBuilder
   def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
                                           cacheElementOpt: Option[S2VertexLike]): Option[S2VertexLike] = {
     try {
@@ -47,7 +48,7 @@ class VertexDeserializable(graph: S2GraphLike,
         propsMap += (columnMeta -> innerVal)
       }
 
-      val vertex = graph.newVertex(vertexId, kv.timestamp, S2Vertex.EmptyProps, belongLabelIds = Nil)
+      val vertex = builder.newVertex(vertexId, kv.timestamp, S2Vertex.EmptyProps, belongLabelIds = Nil)
       S2Vertex.fillPropsWithTs(vertex, propsMap.toMap)
 
       Option(vertex)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala
index 002f577..d1d4d7d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala
@@ -30,6 +30,7 @@ import scala.collection.mutable.ListBuffer
 
 class VertexDeserializable(graph: S2GraphLike,
                            bytesToInt: (Array[Byte], Int) => Int = bytesToInt) extends Deserializable[S2VertexLike] {
+  val builder = graph.elementBuilder
   def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
                                           cacheElementOpt: Option[S2VertexLike]): Option[S2VertexLike] = {
     try {
@@ -62,7 +63,7 @@ class VertexDeserializable(graph: S2GraphLike,
         }
       }
       assert(maxTs != Long.MinValue)
-      val vertex = graph.newVertex(vertexId, maxTs, S2Vertex.EmptyProps, belongLabelIds = belongLabelIds)
+      val vertex = builder.newVertex(vertexId, maxTs, S2Vertex.EmptyProps, belongLabelIds = belongLabelIds)
       S2Vertex.fillPropsWithTs(vertex, propsMap.toMap)
 
       Option(vertex)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala
index 94883c9..2b439bc 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala
@@ -66,14 +66,14 @@ class S2EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
   test("buildOperation") {
     val schemaVersion = "v2"
     val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion))
-    val srcVertex = graph.newVertex(vertexId)
+    val srcVertex = builder.newVertex(vertexId)
     val tgtVertex = srcVertex
 
     val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
 
     val snapshotEdge = None
     val propsWithTs = Map(timestampProp)
-    val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs)
+    val requestEdge = builder.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs)
 
     val newVersion = 0L
 
@@ -93,14 +93,14 @@ class S2EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
   test("buildMutation: snapshotEdge: None with newProps") {
     val schemaVersion = "v2"
     val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion))
-    val srcVertex = graph.newVertex(vertexId)
+    val srcVertex = builder.newVertex(vertexId)
     val tgtVertex = srcVertex
 
     val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
 
     val snapshotEdge = None
     val propsWithTs = Map(timestampProp)
-    val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs)
+    val requestEdge = builder.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs)
 
     val newVersion = 0L
 
@@ -120,14 +120,14 @@ class S2EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
   test("buildMutation: oldPropsWithTs == newPropsWithTs, Drop all requests") {
     val schemaVersion = "v2"
     val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion))
-    val srcVertex = graph.newVertex(vertexId)
+    val srcVertex = builder.newVertex(vertexId)
     val tgtVertex = srcVertex
 
     val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
 
     val snapshotEdge = None
     val propsWithTs = Map(timestampProp)
-    val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs)
+    val requestEdge = builder.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs)
 
     val newVersion = 0L
 
@@ -144,7 +144,7 @@ class S2EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
   test("buildMutation: All props older than snapshotEdge's LastDeletedAt") {
     val schemaVersion = "v2"
     val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion))
-    val srcVertex = graph.newVertex(vertexId)
+    val srcVertex = builder.newVertex(vertexId)
     val tgtVertex = srcVertex
 
     val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
@@ -159,12 +159,12 @@ class S2EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
       LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3)
     )
 
-    val _snapshotEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = propsWithTs)
+    val _snapshotEdge = builder.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = propsWithTs)
 
     val snapshotEdge = Option(_snapshotEdge)
 
 
-    val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs)
+    val requestEdge = builder.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs)
 
     val newVersion = 0L
     val edgeMutate = S2Edge.buildMutation(snapshotEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs)
@@ -178,7 +178,7 @@ class S2EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
   test("buildMutation: All props newer than snapshotEdge's LastDeletedAt") {
     val schemaVersion = "v2"
     val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion))
-    val srcVertex = graph.newVertex(vertexId)
+    val srcVertex = builder.newVertex(vertexId)
     val tgtVertex = srcVertex
 
     val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
@@ -193,11 +193,11 @@ class S2EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
       LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3)
     )
 
-    val _snapshotEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = propsWithTs)
+    val _snapshotEdge = builder.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = propsWithTs)
 
     val snapshotEdge = Option(_snapshotEdge)
 
-    val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs)
+    val requestEdge = builder.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs)
 
     val newVersion = 0L
     val edgeMutate = S2Edge.buildMutation(snapshotEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
index 4614bed..6ac77e4 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
@@ -35,12 +35,13 @@ trait TestCommonWithModels {
   var graph: S2Graph = _
   var config: Config = _
   var management: Management = _
+  var builder: GraphElementBuilder = _
 
   def initTests() = {
     config = ConfigFactory.load()
     graph = new S2Graph(config)(ExecutionContext.Implicits.global)
     management = new Management(graph)
-
+    builder = graph.elementBuilder
     implicit val session = AutoSession
 
     deleteTestLabel()