You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/11/30 12:07:18 UTC

[2/7] incubator-s2graph git commit: [S2GRAPH-130]: Edge.propsWithTs data type should be changed into mutable to support setter interface exist in tp3. - Make Vertex/Edge/Graph to implement Tinkerpop3. - Change data type of Edge's propsWithTs to java.ut

[S2GRAPH-130]: Edge.propsWithTs data type should be changed into mutable to support setter interface exist in tp3.
 - Make Vertex/Edge/Graph to implement Tinkerpop3.
 - Change data type of Edge's propsWithTs to java.util.Map[String, S2Property[_]].


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/6356573e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/6356573e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/6356573e

Branch: refs/heads/master
Commit: 6356573e6a658dbfeb240bdee642d055991e5ac2
Parents: 292174e
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu Nov 24 12:14:08 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Nov 24 12:14:08 2016 +0900

----------------------------------------------------------------------
 .../loader/subscriber/GraphSubscriber.scala     |   2 +-
 .../loader/subscriber/TransferToHFile.scala     |   4 +-
 .../s2graph/loader/subscriber/WalLogStat.scala  |   2 +-
 .../loader/subscriber/WalLogToHDFS.scala        |   2 +-
 .../scala/org/apache/s2graph/core/Edge.scala    | 442 +++++++++++++-----
 .../scala/org/apache/s2graph/core/Graph.scala   | 246 +++++++---
 .../org/apache/s2graph/core/QueryParam.scala    |   4 +-
 .../org/apache/s2graph/core/QueryResult.scala   |   2 +-
 .../org/apache/s2graph/core/S2Property.scala    |  33 +-
 .../scala/org/apache/s2graph/core/Vertex.scala  |  24 +-
 .../s2graph/core/rest/RequestParser.scala       |   8 +-
 .../apache/s2graph/core/storage/Storage.scala   |  58 ++-
 .../tall/IndexEdgeDeserializable.scala          |  80 ++--
 .../wide/IndexEdgeDeserializable.scala          |  78 ++--
 .../tall/SnapshotEdgeDeserializable.scala       |   8 +-
 .../wide/SnapshotEdgeDeserializable.scala       |   8 +-
 .../org/apache/s2graph/core/EdgeTest.scala      | 457 +++----------------
 .../core/Integrate/IntegrateCommon.scala        |   2 +
 .../core/Integrate/WeakLabelDeleteTest.scala    |   5 +-
 .../s2graph/core/parsers/WhereParserTest.scala  |  31 +-
 .../core/storage/hbase/IndexEdgeTest.scala      | 206 ++++-----
 .../loader/core/CounterEtlFunctions.scala       |   5 +-
 22 files changed, 850 insertions(+), 857 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
index 05aed34..b25bc84 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
@@ -106,7 +106,7 @@ object GraphSubscriberHelper extends WithKafka {
                      (statFunc: (String, Int) => Unit): Iterable[GraphElement] = {
     (for (msg <- msgs) yield {
       statFunc("total", 1)
-      Graph.toGraphElement(msg, labelMapping) match {
+      g.toGraphElement(msg, labelMapping) match {
         case Some(e) if e.isInstanceOf[Edge] =>
           statFunc("EdgeParseOk", 1)
           e.asInstanceOf[Edge]

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
index 9ebff03..3345d56 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
@@ -101,7 +101,7 @@ object TransferToHFile extends SparkApp {
 
     val ts = System.currentTimeMillis()
     val propsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion))
-    val edge = Edge(vertex, vertex, label, dir, propsWithTs=propsWithTs)
+    val edge = GraphSubscriberHelper.g.newEdge(vertex, vertex, label, dir, propsWithTs=propsWithTs)
 
     edge.edgesWithIndex.flatMap { indexEdge =>
       GraphSubscriberHelper.g.getStorage(indexEdge.label).indexEdgeSerializer(indexEdge).toKeyValues.map { kv =>
@@ -125,7 +125,7 @@ object TransferToHFile extends SparkApp {
   def toKeyValues(strs: Seq[String], labelMapping: Map[String, String], autoEdgeCreate: Boolean): Iterator[KeyValue] = {
     val kvs = for {
       s <- strs
-      element <- Graph.toGraphElement(s, labelMapping).toSeq if element.isInstanceOf[Edge]
+      element <- GraphSubscriberHelper.g.toGraphElement(s, labelMapping).toSeq if element.isInstanceOf[Edge]
       edge = element.asInstanceOf[Edge]
       putRequest <- insertBulkForLoaderAsync(edge, autoEdgeCreate)
     } yield {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
index d47e648..5b68754 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
@@ -69,7 +69,7 @@ object WalLogStat extends SparkApp with WithKafka {
         val phase = System.getProperty("phase")
         GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList)
         partition.map { case (key, msg) =>
-          Graph.toGraphElement(msg) match {
+          GraphSubscriberHelper.g.toGraphElement(msg) match {
             case Some(elem) =>
               val serviceName = elem.serviceName
               msg.split("\t", 7) match {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
index a8fc4df..0f69dc7 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
@@ -92,7 +92,7 @@ object WalLogToHDFS extends SparkApp with WithKafka {
         GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList)
 
         partition.flatMap { case (key, msg) =>
-          val optMsg = Graph.toGraphElement(msg).flatMap { element =>
+          val optMsg = GraphSubscriberHelper.g.toGraphElement(msg).flatMap { element =>
             val arr = msg.split("\t", 7)
             val service = element.serviceName
             val label = arr(5)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
index b27a05e..87f9cd7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
@@ -20,50 +20,57 @@
 package org.apache.s2graph.core
 
 import java.util
+import java.util.function.BiConsumer
 
+import org.apache.s2graph.core.Edge.{Props, State}
 import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
 import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.utils.logger
 import org.apache.tinkerpop.gremlin.structure
+import org.apache.tinkerpop.gremlin.structure.{Direction, Edge => TpEdge, Graph => TpGraph, Property}
 import play.api.libs.json.{JsNumber, JsObject, Json}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{Map => MutableMap}
 import scala.util.hashing.MurmurHash3
-import org.apache.tinkerpop.gremlin.structure.{Edge => TpEdge, Direction, Property, Graph => TpGraph}
 
-case class SnapshotEdge(srcVertex: Vertex,
+case class SnapshotEdge(graph: Graph,
+                        srcVertex: Vertex,
                         tgtVertex: Vertex,
                         label: Label,
-                        direction: Int,
+                        dir: Int,
                         op: Byte,
                         version: Long,
-                        private val propsWithTs: Map[LabelMeta, InnerValLikeWithTs],
+                        private val propsWithTs: Props,
                         pendingEdgeOpt: Option[Edge],
                         statusCode: Byte = 0,
                         lockTs: Option[Long],
                         tsInnerValOpt: Option[InnerValLike] = None) {
-
-  lazy val labelWithDir = LabelWithDirection(label.id.get, direction)
-  if (!propsWithTs.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.")
+  lazy val direction = GraphUtil.fromDirection(dir)
+  lazy val operation = GraphUtil.fromOp(op)
+  lazy val edge = toEdge
+  lazy val labelWithDir = LabelWithDirection(label.id.get, dir)
+//  if (!propsWithTs.contains(LabelMeta.timestamp.name)) throw new Exception("Timestamp is required.")
 
 //  val label = Label.findById(labelWithDir.labelId)
   lazy val schemaVer = label.schemaVersion
-  lazy val propsWithoutTs = propsWithTs.mapValues(_.innerVal)
-  lazy val ts = propsWithTs(LabelMeta.timestamp).innerVal.toString().toLong
+  lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.toString().toLong
 
-  def propsToKeyValuesWithTs = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.map(kv => kv._1.seq -> kv._2).toSeq)
+  def propsToKeyValuesWithTs = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq)
 
   def allPropsDeleted = Edge.allPropsDeleted(propsWithTs)
 
   def toEdge: Edge = {
-    val ts = propsWithTs.get(LabelMeta.timestamp).map(v => v.ts).getOrElse(version)
-    Edge(srcVertex, tgtVertex, label, direction, op,
+    Edge(graph, srcVertex, tgtVertex, label, dir, op,
       version, propsWithTs, pendingEdgeOpt = pendingEdgeOpt,
       statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt)
   }
 
   def propsWithName = (for {
-    (meta, v) <- propsWithTs
+    (_, v) <- propsWithTs.asScala
+    meta = v.labelMeta
     jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
   } yield meta.name -> jsValue) ++ Map("version" -> JsNumber(version))
 
@@ -71,26 +78,55 @@ case class SnapshotEdge(srcVertex: Vertex,
   def toLogString() = {
     List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, propsWithName).mkString("\t")
   }
+
+  def property[V](key: String, value: V, ts: Long): S2Property[V] = {
+    val labelMeta = label.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on IndexEdge."))
+    val newProps = new S2Property(edge, labelMeta, key, value, ts)
+    propsWithTs.put(key, newProps)
+    newProps
+  }
+  override def hashCode(): Int = {
+    MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId)
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case e: SnapshotEdge =>
+      srcVertex.innerId == e.srcVertex.innerId &&
+        tgtVertex.innerId == e.tgtVertex.innerId &&
+        labelWithDir == e.labelWithDir && op == e.op && version == e.version &&
+        pendingEdgeOpt == e.pendingEdgeOpt && lockTs == lockTs && statusCode == statusCode
+    case _ => false
+  }
+
+  override def toString(): String = {
+    Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> label.label, "direction" -> direction,
+      "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString,
+      "statusCode" -> statusCode, "lockTs" -> lockTs).toString
+  }
 }
 
-case class IndexEdge(srcVertex: Vertex,
+case class IndexEdge(graph: Graph,
+                     srcVertex: Vertex,
                      tgtVertex: Vertex,
                      label: Label,
-                     direction: Int,
+                     dir: Int,
                      op: Byte,
                      version: Long,
                      labelIndexSeq: Byte,
-                     private val propsWithTs: Map[LabelMeta, InnerValLikeWithTs],
+                     private val propsWithTs: Props,
                      tsInnerValOpt: Option[InnerValLike] = None)  {
 //  if (!props.contains(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.")
   //  assert(props.contains(LabelMeta.timeStampSeq))
-  lazy val labelWithDir = LabelWithDirection(label.id.get, direction)
+  lazy val direction = GraphUtil.fromDirection(dir)
+  lazy val operation = GraphUtil.fromOp(op)
+  lazy val edge = toEdge
+  lazy val labelWithDir = LabelWithDirection(label.id.get, dir)
 
   lazy val isInEdge = labelWithDir.dir == GraphUtil.directions("in")
   lazy val isOutEdge = !isInEdge
 
-  lazy val ts = propsWithTs(LabelMeta.timestamp).innerVal.toString.toLong
-  lazy val degreeEdge = propsWithTs.contains(LabelMeta.degree)
+  lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.toString.toLong
+  lazy val degreeEdge = propsWithTs.containsKey(LabelMeta.degree.name)
 
   lazy val schemaVer = label.schemaVersion
   lazy val labelIndex = LabelIndex.findByLabelIdAndSeq(labelWithDir.labelId, labelIndexSeq).get
@@ -103,8 +139,8 @@ case class IndexEdge(srcVertex: Vertex,
 
   /** TODO: make sure call of this class fill props as this assumes */
   lazy val orders = for (meta <- labelIndexMetaSeqs) yield {
-    propsWithTs.get(meta) match {
-      case None =>
+    propsWithTs.get(meta.name) match {
+      case null =>
 
         /**
           * TODO: agly hack
@@ -120,12 +156,12 @@ case class IndexEdge(srcVertex: Vertex,
         }
 
         meta -> v
-      case Some(v) => meta -> v.innerVal
+      case v => meta -> v.innerVal
     }
   }
 
-  lazy val ordersKeyMap = orders.map { case (byte, _) => byte }.toSet
-  lazy val metas = for ((meta, v) <- propsWithTs if !ordersKeyMap.contains(meta)) yield meta -> v.innerVal
+  lazy val ordersKeyMap = orders.map { case (meta, _) => meta.name }.toSet
+  lazy val metas = for ((meta, v) <- propsWithTs.asScala if !ordersKeyMap.contains(meta)) yield v.labelMeta -> v.innerVal
 
 //  lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) }
 
@@ -135,12 +171,13 @@ case class IndexEdge(srcVertex: Vertex,
   lazy val hasAllPropsForIndex = orders.length == labelIndexMetaSeqs.length
 
   def propsWithName = for {
-    (meta, v) <- propsWithTs if meta.seq >= 0
+    (_, v) <- propsWithTs.asScala
+    meta = v.labelMeta
     jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
   } yield meta.name -> jsValue
 
 
-  def toEdge: Edge = Edge(srcVertex, tgtVertex, label, direction, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
+  def toEdge: Edge = Edge(graph, srcVertex, tgtVertex, label, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
 
   // only for debug
   def toLogString() = {
@@ -152,52 +189,99 @@ case class IndexEdge(srcVertex: Vertex,
   }
 
   def property(labelMeta: LabelMeta): InnerValLikeWithTs = {
-    propsWithTs.get(labelMeta).getOrElse(label.metaPropsDefaultMapInner(labelMeta))
+//    propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse(label.metaPropsDefaultMapInner(labelMeta))
+    if (propsWithTs.containsKey(labelMeta.name)) {
+      propsWithTs.get(labelMeta.name).innerValWithTs
+    } else {
+      label.metaPropsDefaultMapInner(labelMeta)
+    }
   }
 
-  def updatePropsWithTs(others: Map[LabelMeta, InnerValLikeWithTs] = Map.empty): Map[LabelMeta, InnerValLikeWithTs] = {
+  def updatePropsWithTs(others: Props = Edge.EmptyProps): Props = {
     if (others.isEmpty) propsWithTs
-    else propsWithTs ++ others
+    else {
+      val iter = others.entrySet().iterator()
+      while (iter.hasNext) {
+        val e = iter.next()
+        propsWithTs.put(e.getKey, e.getValue)
+      }
+      propsWithTs
+    }
+  }
+
+  def property[V](key: String, value: V, ts: Long): S2Property[V] = {
+    val labelMeta = label.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on IndexEdge."))
+    val newProps = new S2Property(edge, labelMeta, key, value, ts)
+    propsWithTs.put(key, newProps)
+    newProps
+  }
+  override def hashCode(): Int = {
+    MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId + "," + labelIndexSeq)
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case e: IndexEdge =>
+      srcVertex.innerId == e.srcVertex.innerId &&
+        tgtVertex.innerId == e.tgtVertex.innerId &&
+        labelWithDir == e.labelWithDir && op == e.op && version == e.version &&
+        labelIndexSeq == e.labelIndexSeq
+    case _ => false
+  }
+
+  override def toString(): String = {
+    Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> label.label, "direction" -> dir,
+      "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString
+    ).toString
   }
 }
 
-case class Edge(srcVertex: Vertex,
-                tgtVertex: Vertex,
+case class Edge(innerGraph: Graph,
+                srcVertex: Vertex,
+                var tgtVertex: Vertex,
                 innerLabel: Label,
                 dir: Int,
-                op: Byte = GraphUtil.defaultOpByte,
-                version: Long = System.currentTimeMillis(),
-                private val propsWithTs: Map[LabelMeta, InnerValLikeWithTs],
+                var op: Byte = GraphUtil.defaultOpByte,
+                var version: Long = System.currentTimeMillis(),
+                propsWithTs: Props = Edge.EmptyProps,
                 parentEdges: Seq[EdgeWithScore] = Nil,
                 originalEdgeOpt: Option[Edge] = None,
                 pendingEdgeOpt: Option[Edge] = None,
                 statusCode: Byte = 0,
                 lockTs: Option[Long] = None,
-                tsInnerValOpt: Option[InnerValLike] = None) extends GraphElement with TpEdge {
+                var tsInnerValOpt: Option[InnerValLike] = None) extends GraphElement with TpEdge {
 
   lazy val labelWithDir = LabelWithDirection(innerLabel.id.get, dir)
   lazy val schemaVer = innerLabel.schemaVersion
-  lazy val ts = propsWithTs(LabelMeta.timestamp).innerVal.value match {
+  lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.value match {
     case b: BigDecimal => b.longValue()
     case l: Long => l
     case i: Int => i.toLong
     case _ => throw new RuntimeException("ts should be in [BigDecimal/Long/Int].")
   }
 
-  //FIXME
+  lazy val operation = GraphUtil.fromOp(op)
   lazy val tsInnerVal = tsInnerValOpt.get.value
   lazy val srcId = srcVertex.innerIdVal
   lazy val tgtId = tgtVertex.innerIdVal
   lazy val labelName = innerLabel.label
   lazy val direction = GraphUtil.fromDirection(dir)
   
-  def toIndexEdge(labelIndexSeq: Byte): IndexEdge = IndexEdge(srcVertex, tgtVertex, innerLabel, dir, op, version, labelIndexSeq, propsWithTs)
+  def toIndexEdge(labelIndexSeq: Byte): IndexEdge = IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelIndexSeq, propsWithTs)
 
-  def serializePropsWithTs(): Array[Byte] = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.map(kv => kv._1.seq -> kv._2).toSeq)
+  def serializePropsWithTs(): Array[Byte] = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq)
 
-  def updatePropsWithTs(others: Map[LabelMeta, InnerValLikeWithTs] = Map.empty): Map[LabelMeta, InnerValLikeWithTs] = {
-    if (others.isEmpty) propsWithTs
-    else propsWithTs ++ others
+  def updatePropsWithTs(others: Props = Edge.EmptyProps): Props = {
+    val emptyProp = Edge.EmptyProps
+
+    propsWithTs.forEach(new BiConsumer[String, S2Property[_]] {
+      override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value)
+    })
+
+    others.forEach(new BiConsumer[String, S2Property[_]] {
+      override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value)
+    })
+
+    emptyProp
   }
 
   def propertyValue(key: String): Option[InnerValLikeWithTs] = {
@@ -212,7 +296,12 @@ case class Edge(srcVertex: Vertex,
   }
 
   def propertyValueInner(labelMeta: LabelMeta): InnerValLikeWithTs= {
-    propsWithTs.getOrElse(labelMeta, innerLabel.metaPropsDefaultMapInner(labelMeta))
+    //    propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse()
+    if (propsWithTs.containsKey(labelMeta.name)) {
+      propsWithTs.get(labelMeta.name).innerValWithTs
+    } else {
+      innerLabel.metaPropsDefaultMapInner(labelMeta)
+    }
   }
 
   def propertyValues(keys: Seq[String] = Nil): Map[LabelMeta, InnerValLikeWithTs] = {
@@ -242,14 +331,21 @@ case class Edge(srcVertex: Vertex,
 
   lazy val properties = toProps()
 
-  def props = propsWithTs.mapValues(_.innerVal)
+  def props = propsWithTs.asScala.mapValues(_.innerVal)
 
 
   private def toProps(): Map[String, Any] = {
     for {
       (labelMeta, defaultVal) <- innerLabel.metaPropsDefaultMapInner
     } yield {
-      labelMeta.name -> propsWithTs.getOrElse(labelMeta, defaultVal).innerVal.value
+      //      labelMeta.name -> propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse(defaultVal).innerVal.value
+      val value =
+        if (propsWithTs.containsKey(labelMeta.name)) {
+          propsWithTs.get(labelMeta.name).value
+        } else {
+          defaultVal.innerVal.value
+        }
+      labelMeta.name -> value
     }
   }
 
@@ -302,21 +398,21 @@ case class Edge(srcVertex: Vertex,
 
   override def isAsync = innerLabel.isAsync
 
-  def isDegree = propsWithTs.contains(LabelMeta.degree)
+  def isDegree = propsWithTs.containsKey(LabelMeta.degree.name)
 
 //  def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match {
 //    case Some(_) => props
 //    case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer))
 //  }
 
-  def propsPlusTsValid = propsWithTs.filter(kv => LabelMeta.isValidSeq(kv._1.seq))
+  def propsPlusTsValid = propsWithTs.asScala.filter(kv => LabelMeta.isValidSeq(kv._2.labelMeta.seq)).asJava
 
   def edgesWithIndex = for (labelOrder <- labelOrders) yield {
-    IndexEdge(srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsWithTs, tsInnerValOpt = tsInnerValOpt)
+    IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsWithTs, tsInnerValOpt = tsInnerValOpt)
   }
 
   def edgesWithIndexValid = for (labelOrder <- labelOrders) yield {
-    IndexEdge(srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsPlusTsValid, tsInnerValOpt = tsInnerValOpt)
+    IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsPlusTsValid, tsInnerValOpt = tsInnerValOpt)
   }
 
   /** force direction as out on invertedEdge */
@@ -325,38 +421,28 @@ case class Edge(srcVertex: Vertex,
 
 //    val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, GraphUtil.directions("out"))
 
-    val ret = SnapshotEdge(smaller, larger, innerLabel, GraphUtil.directions("out"), op, version,
-      Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(ts, schemaVer), ts)) ++ propsWithTs,
+    property(LabelMeta.timestamp.name, ts, ts)
+    val ret = SnapshotEdge(innerGraph, smaller, larger, innerLabel,
+      GraphUtil.directions("out"), op, version, propsWithTs,
       pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt)
     ret
   }
 
-  override def hashCode(): Int = {
-    MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId)
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case e: Edge =>
-      srcVertex.innerId == e.srcVertex.innerId &&
-        tgtVertex.innerId == e.tgtVertex.innerId &&
-        labelWithDir == e.labelWithDir
-    case _ => false
-  }
-
   def defaultPropsWithName = Json.obj("from" -> srcVertex.innerId.toString(), "to" -> tgtVertex.innerId.toString(),
     "label" -> innerLabel.label, "service" -> innerLabel.serviceName)
 
   def propsWithName =
     for {
-      (meta, v) <- props if meta.seq > 0
-      jsValue <- innerValToJsValue(v, meta.dataType)
+      (_, v) <- propsWithTs.asScala
+      meta = v.labelMeta
+      jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
     } yield meta.name -> jsValue
 
 
   def updateTgtVertex(id: InnerValLike) = {
     val newId = TargetVertexId(tgtVertex.id.colId, id)
     val newTgtVertex = Vertex(newId, tgtVertex.ts, tgtVertex.props)
-    Edge(srcVertex, newTgtVertex, innerLabel, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
+    Edge(innerGraph, srcVertex, newTgtVertex, innerLabel, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
   }
 
   def rank(r: RankParam): Double =
@@ -364,17 +450,15 @@ case class Edge(srcVertex: Vertex,
     else {
       var sum: Double = 0
 
-      for ((seq, w) <- r.keySeqAndWeights) {
-        propsWithTs.get(seq) match {
-          case None => // do nothing
-          case Some(innerValWithTs) => {
-            val cost = try innerValWithTs.innerVal.toString.toDouble catch {
-              case e: Exception =>
-                logger.error("toInnerval failed in rank", e)
-                1.0
-            }
-            sum += w * cost
+      for ((labelMeta, w) <- r.keySeqAndWeights) {
+        if (propsWithTs.containsKey(labelMeta.name)) {
+          val innerValWithTs = propsWithTs.get(labelMeta.name)
+          val cost = try innerValWithTs.innerVal.toString.toDouble catch {
+            case e: Exception =>
+              logger.error("toInnerval failed in rank", e)
+              1.0
           }
+          sum += w * cost
         }
       }
       sum
@@ -385,23 +469,92 @@ case class Edge(srcVertex: Vertex,
     List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, allPropsWithName).mkString("\t")
   }
 
+  override def hashCode(): Int = {
+    MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId)
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case e: Edge =>
+      srcVertex.innerId == e.srcVertex.innerId &&
+        tgtVertex.innerId == e.tgtVertex.innerId &&
+        labelWithDir == e.labelWithDir && op == e.op && version == e.version &&
+        pendingEdgeOpt == e.pendingEdgeOpt && lockTs == lockTs && statusCode == statusCode &&
+        parentEdges == e.parentEdges && originalEdgeOpt == originalEdgeOpt
+    case _ => false
+  }
+
+  override def toString(): String = {
+    Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> labelName, "direction" -> direction,
+      "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString,
+      "parentEdges" -> parentEdges, "originalEdge" -> originalEdgeOpt, "statusCode" -> statusCode, "lockTs" -> lockTs
+    ).toString
+  }
+
+  def checkProperty(key: String): Boolean = propsWithTs.containsKey(key)
+
+  def copyEdge(srcVertex: Vertex = srcVertex,
+               tgtVertex: Vertex = tgtVertex,
+               innerLabel: Label = innerLabel,
+               dir: Int = dir,
+               op: Byte = op,
+               version: Long = version,
+               propsWithTs: State = Edge.propsToState(this.propsWithTs),
+               parentEdges: Seq[EdgeWithScore] = parentEdges,
+               originalEdgeOpt: Option[Edge] = originalEdgeOpt,
+               pendingEdgeOpt: Option[Edge] = pendingEdgeOpt,
+               statusCode: Byte = statusCode,
+               lockTs: Option[Long] = lockTs,
+               tsInnerValOpt: Option[InnerValLike] = tsInnerValOpt,
+               ts: Long = ts): Edge = {
+    val edge = new Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, Edge.EmptyProps,
+      parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
+    Edge.fillPropsWithTs(edge, propsWithTs)
+    edge.property(LabelMeta.timestamp.name, ts, ts)
+    edge
+  }
+
+  def copyEdgeWithState(state: State, ts: Long): Edge = {
+    val newEdge = copy(propsWithTs = Edge.EmptyProps)
+    Edge.fillPropsWithTs(newEdge, state)
+    newEdge.property(LabelMeta.timestamp.name, ts, ts)
+    newEdge
+  }
+
+  def copyEdgeWithState(state: State): Edge = {
+    val newEdge = copy(propsWithTs = Edge.EmptyProps)
+    Edge.fillPropsWithTs(newEdge, state)
+    newEdge
+  }
+
   override def vertices(direction: Direction): util.Iterator[structure.Vertex] = ???
 
   override def properties[V](strings: String*): util.Iterator[Property[V]] = ???
 
-  override def property[V](key: String): Property[V] = ???
+  override def property[V](key: String): Property[V] = {
+    val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge."))
+    if (propsWithTs.containsKey(key)) propsWithTs.get(key).asInstanceOf[Property[V]]
+    else {
+      val default = innerLabel.metaPropsDefaultMapInner(labelMeta)
+      property(key, default.innerVal.value, default.ts).asInstanceOf[Property[V]]
+    }
+  }
 
   override def property[V](key: String, value: V): Property[V] = {
     property(key, value, System.currentTimeMillis())
   }
 
-  def property[V](key: String, value: V, ts: Long): Property[V] = ???
+  def property[V](key: String, value: V, ts: Long): Property[V] = {
+    val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge."))
+    val newProp = new S2Property[V](this, labelMeta, key, value, ts)
+    propsWithTs.put(key, newProp)
+    newProp
+  }
 
-  override def remove(): Unit = ???
+  override def remove(): Unit = {}
 
-  override def graph(): TpGraph = ???
-  
-  override def id(): AnyRef = ???
+  override def graph(): TpGraph = innerGraph
+
+  override def id(): AnyRef = (srcVertex.innerId, labelWithDir, tgtVertex.innerId)
 
   override def label(): String = innerLabel.label
 }
@@ -425,38 +578,63 @@ object Edge {
   val incrementVersion = 1L
   val minTsVal = 0L
 
-  def toEdge(srcId: Any,
-                    tgtId: Any,
-                    labelName: String,
-                    direction: String,
-                    props: Map[String, Any] = Map.empty,
-                    ts: Long = System.currentTimeMillis(),
-                    operation: String = "insert"): Edge = {
-    val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
-
-    val srcVertexId = toInnerVal(srcId.toString, label.srcColumn.columnType, label.schemaVersion)
-    val tgtVertexId = toInnerVal(tgtId.toString, label.tgtColumn.columnType, label.schemaVersion)
+  /** now version information is required also **/
+  type Props = java.util.Map[String, S2Property[_]]
+  type State = Map[LabelMeta, InnerValLikeWithTs]
+  type PropsPairWithTs = (State, State, Long, String)
+  type MergeState = PropsPairWithTs => (State, Boolean)
+  type UpdateFunc = (Option[Edge], Edge, MergeState)
 
-    val srcColId = label.srcColumn.id.get
-    val tgtColId = label.tgtColumn.id.get
+  def EmptyProps = new java.util.HashMap[String, S2Property[_]]
+  def EmptyState = Map.empty[LabelMeta, InnerValLikeWithTs]
+  def sameProps(base: Props, other: Props): Boolean = {
+    if (base.size != other.size) false
+    else {
+      var ret = true
+      val iter = base.entrySet().iterator()
+      while (iter.hasNext) {
+        val e = iter.next()
+        if (!other.containsKey(e.getKey)) ret = false
+        else if (e.getValue != other.get(e.getKey)) ret = false
+        else {
 
-    val srcVertex = Vertex(SourceVertexId(srcColId, srcVertexId), System.currentTimeMillis())
-    val tgtVertex = Vertex(TargetVertexId(tgtColId, tgtVertexId), System.currentTimeMillis())
-    val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
+        }
+      }
+      val otherIter = other.entrySet().iterator()
+      while (otherIter.hasNext) {
+        val e = otherIter.next()
+        if (!base.containsKey(e.getKey)) ret = false
+        else if (e.getValue != base.get(e.getKey)) ret = false
+        else {
 
-    val labelWithDir = LabelWithDirection(label.id.get, dir)
-    val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts)
-    val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
-    val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
+        }
+      }
+      ret
+    }
+//    base.sameElements(other)
+  }
+  def fillPropsWithTs(snapshotEdge: SnapshotEdge, state: State): Unit = {
+    state.foreach { case (k, v) => snapshotEdge.property(k.name, v.innerVal.value, v.ts) }
+  }
+  def fillPropsWithTs(indexEdge: IndexEdge, state: State): Unit = {
+    state.foreach { case (k, v) => indexEdge.property(k.name, v.innerVal.value, v.ts) }
+  }
+  def fillPropsWithTs(edge: Edge, state: State): Unit = {
+    state.foreach { case (k, v) => edge.property(k.name, v.innerVal.value, v.ts) }
+  }
 
-    new Edge(srcVertex, tgtVertex, label, dir, op = op, version = ts, propsWithTs = propsWithTs)
+  def propsToState(props: Props): State = {
+    props.asScala.map { case (k, v) =>
+      v.labelMeta -> v.innerValWithTs
+    }.toMap
   }
 
-  /** now version information is required also **/
-  type State = Map[LabelMeta, InnerValLikeWithTs]
-  type PropsPairWithTs = (State, State, Long, String)
-  type MergeState = PropsPairWithTs => (State, Boolean)
-  type UpdateFunc = (Option[Edge], Edge, MergeState)
+  def stateToProps(edge: Edge, state: State): Props = {
+    state.foreach { case (k, v) =>
+      edge.property(k.name, v.innerVal.value, v.ts)
+    }
+    edge.propsWithTs
+  }
 
   def allPropsDeleted(props: Map[LabelMeta, InnerValLikeWithTs]): Boolean =
     if (!props.contains(LabelMeta.lastDeletedAt)) false
@@ -467,6 +645,23 @@ object Edge {
       propsWithoutLastDeletedAt.forall { case (_, v) => v.ts <= lastDeletedAt }
     }
 
+  def allPropsDeleted(props: Props): Boolean =
+    if (!props.containsKey(LabelMeta.lastDeletedAt.name)) false
+    else {
+      val lastDeletedAt = props.get(LabelMeta.lastDeletedAt.name).ts
+      props.remove(LabelMeta.lastDeletedAt.name)
+//      val propsWithoutLastDeletedAt = props
+//
+//      propsWithoutLastDeletedAt.forall { case (_, v) => v.ts <= lastDeletedAt }
+      var ret = true
+      val iter = props.entrySet().iterator()
+      while (iter.hasNext) {
+        val e = iter.next()
+        if (e.getValue.ts > lastDeletedAt) ret = false
+      }
+      ret
+    }
+
   def buildDeleteBulk(invertedEdge: Option[Edge], requestEdge: Edge): (Edge, EdgeMutate) = {
     //    assert(invertedEdge.isEmpty)
     //    assert(requestEdge.op == GraphUtil.operations("delete"))
@@ -481,7 +676,8 @@ object Edge {
     //            logger.debug(s"oldEdge: ${invertedEdge.map(_.toStringRaw)}")
     //            logger.debug(s"requestEdge: ${requestEdge.toStringRaw}")
     val oldPropsWithTs =
-      if (invertedEdge.isEmpty) Map.empty[LabelMeta, InnerValLikeWithTs] else invertedEdge.get.propsWithTs
+      if (invertedEdge.isEmpty) Map.empty[LabelMeta, InnerValLikeWithTs]
+      else propsToState(invertedEdge.get.propsWithTs)
 
     val funcs = requestEdges.map { edge =>
       if (edge.op == GraphUtil.operations("insert")) {
@@ -514,7 +710,7 @@ object Edge {
       for {
         (requestEdge, func) <- requestWithFuncs
       } {
-        val (_newPropsWithTs, _) = func(prevPropsWithTs, requestEdge.propsWithTs, requestEdge.ts, requestEdge.schemaVer)
+        val (_newPropsWithTs, _) = func(prevPropsWithTs, propsToState(requestEdge.propsWithTs), requestEdge.ts, requestEdge.schemaVer)
         prevPropsWithTs = _newPropsWithTs
         //        logger.debug(s"${requestEdge.toLogString}\n$oldPropsWithTs\n$prevPropsWithTs\n")
       }
@@ -530,7 +726,9 @@ object Edge {
 
       //      logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}")
       //      logger.error(s"$propsWithTs")
-      (requestEdge.copy(propsWithTs = propsWithTs), edgeMutate)
+      val newEdge = requestEdge.copy(propsWithTs = EmptyProps)
+      fillPropsWithTs(newEdge, propsWithTs)
+      (newEdge, edgeMutate)
     }
   }
 
@@ -540,7 +738,7 @@ object Edge {
         // both direction use same indices that is defined when label creation.
         true
       case Some(dir) =>
-        if (dir != ie.direction) {
+        if (dir != ie.dir) {
           // current labelIndex's direction is different with indexEdge's direction so don't touch
           false
         } else {
@@ -566,13 +764,14 @@ object Edge {
       val newOp = snapshotEdgeOpt match {
         case None => requestEdge.op
         case Some(old) =>
-          val oldMaxTs = old.propsWithTs.map(_._2.ts).max
+          val oldMaxTs = old.propsWithTs.asScala.map(_._2.ts).max
           if (oldMaxTs > requestEdge.ts) old.op
           else requestEdge.op
       }
 
-      val newSnapshotEdgeOpt =
-        Option(requestEdge.copy(op = newOp, propsWithTs = newPropsWithTs, version = newVersion).toSnapshotEdge)
+      val newSnapshotEdge = requestEdge.copy(op = newOp, version = newVersion).copyEdgeWithState(newPropsWithTs)
+
+      val newSnapshotEdgeOpt = Option(newSnapshotEdge.toSnapshotEdge)
       // delete request must always update snapshot.
       if (withOutDeletedAt == oldPropsWithTs && newPropsWithTs.contains(LabelMeta.lastDeletedAt)) {
         // no mutation on indexEdges. only snapshotEdge should be updated to record lastDeletedAt.
@@ -587,12 +786,17 @@ object Edge {
 
         val edgesToInsert =
           if (newPropsWithTs.isEmpty || allPropsDeleted(newPropsWithTs)) Nil
-          else
-            requestEdge.copy(
+          else {
+            val newEdge = requestEdge.copy(
               version = newVersion,
-              propsWithTs = newPropsWithTs,
+              propsWithTs = Edge.EmptyProps,
               op = GraphUtil.defaultOpByte
-            ).relatedEdges.flatMap { relEdge => filterOutWithLabelOption(relEdge.edgesWithIndexValid) }
+            )
+            newPropsWithTs.foreach { case (k, v) => newEdge.property(k.name, v.innerVal.value, v.ts) }
+
+            newEdge.relatedEdges.flatMap { relEdge => filterOutWithLabelOption(relEdge.edgesWithIndexValid) }
+          }
+
 
         EdgeMutate(edgesToDelete = edgesToDelete,
           edgesToInsert = edgesToInsert,

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
index a2b17ef..ec3f286 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
@@ -19,10 +19,11 @@
 
 package org.apache.s2graph.core
 
+import java.util
 import java.util.concurrent.Executors
 
 import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.hadoop.fs.Path
+import org.apache.commons.configuration.Configuration
 import org.apache.s2graph.core.GraphExceptions.{FetchAllStepFailException, FetchTimeoutException, LabelNotExistException}
 import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core.mysqls.{Label, LabelMeta, Model, Service}
@@ -30,8 +31,11 @@ import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
 import org.apache.s2graph.core.storage.{SKeyValue, Storage}
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.utils.{DeferCache, Extensions, SafeUpdateCache, logger}
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer
+import org.apache.tinkerpop.gremlin.structure
+import org.apache.tinkerpop.gremlin.structure.Graph.Variables
+import org.apache.tinkerpop.gremlin.structure.{Graph => TpGraph, Transaction}
 import play.api.libs.json.{JsObject, Json}
-
 import scala.annotation.tailrec
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
@@ -83,62 +87,7 @@ object Graph {
 
   var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs)
 
-  def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try {
-    val parts = GraphUtil.split(s)
-    val logType = parts(2)
-    val element = if (logType == "edge" | logType == "e") {
-      /** current only edge is considered to be bulk loaded */
-      labelMapping.get(parts(5)) match {
-        case None =>
-        case Some(toReplace) =>
-          parts(5) = toReplace
-      }
-      toEdge(parts)
-    } else if (logType == "vertex" | logType == "v") {
-      toVertex(parts)
-    } else {
-      throw new GraphExceptions.JsonParseException("log type is not exist in log.")
-    }
 
-    element
-  } recover {
-    case e: Exception =>
-      logger.error(s"[toElement]: $e", e)
-      None
-  } get
-
-
-  def toVertex(s: String): Option[Vertex] = {
-    toVertex(GraphUtil.split(s))
-  }
-
-  def toEdge(s: String): Option[Edge] = {
-    toEdge(GraphUtil.split(s))
-  }
-
-  def toEdge(parts: Array[String]): Option[Edge] = Try {
-    val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
-    val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
-    val tempDirection = if (parts.length >= 8) parts(7) else "out"
-    val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection
-    val edge = Edge.toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation)
-    Option(edge)
-  } recover {
-    case e: Exception =>
-      logger.error(s"[toEdge]: $e", e)
-      throw e
-  } get
-
-  def toVertex(parts: Array[String]): Option[Vertex] = Try {
-    val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
-    val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
-    val vertex = Vertex.toVertex(serviceName, colName, srcId, props, ts.toLong, operation)
-    Option(vertex)
-  } recover {
-    case e: Throwable =>
-      logger.error(s"[toVertex]: $e", e)
-      throw e
-  } get
 
   def initStorage(graph: Graph, config: Config)(ec: ExecutionContext): Storage[_, _] = {
     val storageBackend = config.getString("s2graph.storage.backend")
@@ -326,7 +275,9 @@ object Graph {
             /** Select */
             val mergedPropsWithTs = edge.propertyValuesInner(propsSelectColumns)
 
-            val newEdge = edge.copy(propsWithTs = mergedPropsWithTs)
+//            val newEdge = edge.copy(propsWithTs = mergedPropsWithTs)
+            val newEdge = edge.copyEdgeWithState(mergedPropsWithTs)
+
             val newEdgeWithScore = edgeWithScore.copy(edge = newEdge)
             /** OrderBy */
             val orderByValues =
@@ -410,7 +361,7 @@ object Graph {
             edge.propertyValues(queryOption.selectColumns) ++ initial
           }
 
-        val newEdge = edge.copy(propsWithTs = mergedPropsWithTs)
+        val newEdge = edge.copyEdgeWithState(mergedPropsWithTs)
         edgeWithScore.copy(edge = newEdge)
       }
     } else Nil
@@ -544,7 +495,7 @@ object Graph {
 
 }
 
-class Graph(_config: Config)(implicit val ec: ExecutionContext) {
+class Graph(_config: Config)(implicit val ec: ExecutionContext) extends TpGraph {
 
   import Graph._
 
@@ -948,20 +899,28 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) {
       val head = filtered.head
       val label = head.edge.innerLabel
       val edgeWithScoreLs = filtered.map { edgeWithScore =>
-        val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match {
-          case "strong" =>
-            val _newPropsWithTs = edgeWithScore.edge.updatePropsWithTs(
-              Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion))
-            )
-
-            (GraphUtil.operations("delete"), requestTs, _newPropsWithTs)
-          case _ =>
-            val oldEdge = edgeWithScore.edge
-            (oldEdge.op, oldEdge.version, oldEdge.updatePropsWithTs())
-        }
-
-        val copiedEdge =
-          edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs)
+          val edge = edgeWithScore.edge
+          val copiedEdge = label.consistencyLevel match {
+            case "strong" =>
+              edge.copyEdge(op = GraphUtil.operations("delete"),
+                version = requestTs, propsWithTs = Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs)
+            case _ =>
+              edge.copyEdge(propsWithTs = Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs)
+          }
+//        val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match {
+//          case "strong" =>
+//            val edge = edgeWithScore.edge
+//            edge.property(LabelMeta.timestamp.name, requestTs)
+//            val _newPropsWithTs = edge.updatePropsWithTs()
+//
+//            (GraphUtil.operations("delete"), requestTs, _newPropsWithTs)
+//          case _ =>
+//            val oldEdge = edgeWithScore.edge
+//            (oldEdge.op, oldEdge.version, oldEdge.updatePropsWithTs())
+//        }
+//
+//        val copiedEdge =
+//          edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs)
 
         val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
         //      logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}")
@@ -1099,7 +1058,7 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) {
               operation: String = "insert",
               withWait: Boolean = true): Future[Boolean] = {
 
-    val innerEdges = Seq(Edge.toEdge(srcId, tgtId, labelName, direction, props.toMap, ts, operation))
+    val innerEdges = Seq(toEdge(srcId, tgtId, labelName, direction, props.toMap, ts, operation))
     mutateEdges(innerEdges, withWait).map(_.headOption.getOrElse(false))
   }
 
@@ -1113,4 +1072,141 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) {
     val innerVertices = Seq(Vertex.toVertex(serviceName, columnName, id, props.toMap, ts, operation))
     mutateVertices(innerVertices, withWait).map(_.headOption.getOrElse(false))
   }
+
+  def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try {
+    val parts = GraphUtil.split(s)
+    val logType = parts(2)
+    val element = if (logType == "edge" | logType == "e") {
+      /** current only edge is considered to be bulk loaded */
+      labelMapping.get(parts(5)) match {
+        case None =>
+        case Some(toReplace) =>
+          parts(5) = toReplace
+      }
+      toEdge(parts)
+    } else if (logType == "vertex" | logType == "v") {
+      toVertex(parts)
+    } else {
+      throw new GraphExceptions.JsonParseException("log type is not exist in log.")
+    }
+
+    element
+  } recover {
+    case e: Exception =>
+      logger.error(s"[toElement]: $e", e)
+      None
+  } get
+
+
+  def toVertex(s: String): Option[Vertex] = {
+    toVertex(GraphUtil.split(s))
+  }
+
+  def toEdge(s: String): Option[Edge] = {
+    toEdge(GraphUtil.split(s))
+  }
+
+  def toEdge(parts: Array[String]): Option[Edge] = Try {
+    val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
+    val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
+    val tempDirection = if (parts.length >= 8) parts(7) else "out"
+    val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection
+    val edge = toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation)
+    Option(edge)
+  } recover {
+    case e: Exception =>
+      logger.error(s"[toEdge]: $e", e)
+      throw e
+  } get
+
+  def toVertex(parts: Array[String]): Option[Vertex] = Try {
+    val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
+    val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
+    val vertex = Vertex.toVertex(serviceName, colName, srcId, props, ts.toLong, operation)
+    Option(vertex)
+  } recover {
+    case e: Throwable =>
+      logger.error(s"[toVertex]: $e", e)
+      throw e
+  } get
+
+  def newSnapshotEdge(srcVertex: Vertex,
+                      tgtVertex: Vertex,
+                      label: Label,
+                      dir: Int,
+                      op: Byte,
+                      version: Long,
+                      propsWithTs: Edge.State,
+                      pendingEdgeOpt: Option[Edge],
+                      statusCode: Byte = 0,
+                      lockTs: Option[Long],
+                      tsInnerValOpt: Option[InnerValLike] = None): SnapshotEdge = {
+    val snapshotEdge = new SnapshotEdge(this, srcVertex, tgtVertex, label, dir, op, version, Edge.EmptyProps,
+      pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
+    Edge.fillPropsWithTs(snapshotEdge, propsWithTs)
+    snapshotEdge
+  }
+
+  def newEdge(srcVertex: Vertex,
+              tgtVertex: Vertex,
+              innerLabel: Label,
+              dir: Int,
+              op: Byte = GraphUtil.defaultOpByte,
+              version: Long = System.currentTimeMillis(),
+              propsWithTs: Edge.State,
+              parentEdges: Seq[EdgeWithScore] = Nil,
+              originalEdgeOpt: Option[Edge] = None,
+              pendingEdgeOpt: Option[Edge] = None,
+              statusCode: Byte = 0,
+              lockTs: Option[Long] = None,
+              tsInnerValOpt: Option[InnerValLike] = None): Edge = {
+    val edge = new Edge(this, srcVertex, tgtVertex, innerLabel, dir, op, version, Edge.EmptyProps,
+      parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
+    Edge.fillPropsWithTs(edge, propsWithTs)
+    edge
+  }
+  def toEdge(srcId: Any,
+             tgtId: Any,
+             labelName: String,
+             direction: String,
+             props: Map[String, Any] = Map.empty,
+             ts: Long = System.currentTimeMillis(),
+             operation: String = "insert"): Edge = {
+    val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
+
+    val srcVertexId = toInnerVal(srcId.toString, label.srcColumn.columnType, label.schemaVersion)
+    val tgtVertexId = toInnerVal(tgtId.toString, label.tgtColumn.columnType, label.schemaVersion)
+
+    val srcColId = label.srcColumn.id.get
+    val tgtColId = label.tgtColumn.id.get
+
+    val srcVertex = Vertex(SourceVertexId(srcColId, srcVertexId), System.currentTimeMillis())
+    val tgtVertex = Vertex(TargetVertexId(tgtColId, tgtVertexId), System.currentTimeMillis())
+    val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
+
+    val labelWithDir = LabelWithDirection(label.id.get, dir)
+    val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts)
+    val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
+    val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
+
+    new Edge(this, srcVertex, tgtVertex, label, dir, op = op, version = ts).copyEdgeWithState(propsWithTs)
+  }
+
+  override def vertices(objects: AnyRef*): util.Iterator[structure.Vertex] = ???
+
+  override def tx(): Transaction = ???
+
+  override def edges(objects: AnyRef*): util.Iterator[structure.Edge] = ???
+
+  override def variables(): Variables = ???
+
+  override def configuration(): Configuration = ???
+
+  override def addVertex(objects: AnyRef*): structure.Vertex = ???
+
+  override def close(): Unit = ???
+
+  override def compute[C <: GraphComputer](aClass: Class[C]): C = ???
+
+  override def compute(): GraphComputer = ???
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
index 7b10709..170fd0b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -376,9 +376,9 @@ case class QueryParam(labelName: String,
 
           val propVal =
             if (InnerVal.isNumericType(labelMeta.dataType)) {
-              InnerVal.withLong(edge.props(labelMeta).value.asInstanceOf[BigDecimal].longValue() + padding, label.schemaVersion)
+              InnerVal.withLong(edge.property(labelMeta.name).value.asInstanceOf[BigDecimal].longValue() + padding, label.schemaVersion)
             } else {
-              edge.props(labelMeta)
+              edge.property(labelMeta.name).asInstanceOf[S2Property[_]].innerVal
             }
 
           labelMeta -> propVal

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
index d8416c2..3753d0f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
@@ -40,7 +40,7 @@ object QueryResult {
       val edgeWithScores = for {
         vertex <- query.vertices
       } yield {
-          val edge = Edge(vertex, vertex, label, queryParam.labelWithDir.dir, propsWithTs = propsWithTs)
+          val edge = graph.newEdge(vertex, vertex, label, queryParam.labelWithDir.dir, propsWithTs = propsWithTs)
           val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore, queryParam.label)
           edgeWithScore
         }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
index 67a9d4c..938a9bb 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
@@ -1,29 +1,48 @@
 package org.apache.s2graph.core
 
-import org.apache.hadoop.hbase.util.Bytes
+
 import org.apache.s2graph.core.mysqls.LabelMeta
-import org.apache.s2graph.core.types.CanInnerValLike
-import org.apache.tinkerpop.gremlin.structure.{Element, Property}
+import org.apache.s2graph.core.types.{InnerValLikeWithTs, CanInnerValLike}
+import org.apache.tinkerpop.gremlin.structure.{Property}
+
+import scala.util.hashing.MurmurHash3
 
 
-case class S2Property[V](element: Element,
+case class S2Property[V](element: Edge,
                          labelMeta: LabelMeta,
                          key: String,
                          value: V,
-                         ts: Long = System.currentTimeMillis()) extends Property[V] {
+                         ts: Long) extends Property[V] {
 
   import CanInnerValLike._
-  lazy val innerVal = anyToInnerValLike.toInnerVal(value, labelMeta.label.schemaVersion)
+  lazy val innerVal = anyToInnerValLike.toInnerVal(value)(element.innerLabel.schemaVersion)
+  lazy val innerValWithTs = InnerValLikeWithTs(innerVal, ts)
 
   def bytes: Array[Byte] = {
     innerVal.bytes
   }
 
   def bytesWithTs: Array[Byte] = {
-    Bytes.add(innerVal.bytes, Bytes.toBytes(ts))
+    innerValWithTs.bytes
   }
 
   override def isPresent: Boolean = ???
 
   override def remove(): Unit = ???
+
+  override def hashCode(): Int = {
+    MurmurHash3.stringHash(labelMeta.labelId + "," + labelMeta.id.get + "," + key + "," + value + "," + ts)
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case p: S2Property[_] =>
+      labelMeta.labelId == p.labelMeta.labelId &&
+      labelMeta.seq == p.labelMeta.seq &&
+      key == p.key && value == p.value && ts == p.ts
+    case _ => false
+  }
+
+  override def toString(): String = {
+    Map("labelMeta" -> labelMeta.toString, "key" -> key, "value" -> value, "ts" -> ts).toString
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
index bbd71ec..0ff4f98 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
@@ -19,15 +19,21 @@
 
 package org.apache.s2graph.core
 
+import java.util
+
 import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core.mysqls.{ColumnMeta, Service, ServiceColumn}
 import org.apache.s2graph.core.types.{InnerVal, InnerValLike, SourceVertexId, VertexId}
+import org.apache.tinkerpop.gremlin.structure
+import org.apache.tinkerpop.gremlin.structure.VertexProperty.Cardinality
+import org.apache.tinkerpop.gremlin.structure.{Vertex => TpVertex, Direction, Edge, VertexProperty, Graph}
 import play.api.libs.json.Json
+
 case class Vertex(id: VertexId,
                   ts: Long = System.currentTimeMillis(),
                   props: Map[Int, InnerValLike] = Map.empty[Int, InnerValLike],
                   op: Byte = 0,
-                  belongLabelIds: Seq[Int] = Seq.empty) extends GraphElement {
+                  belongLabelIds: Seq[Int] = Seq.empty) extends GraphElement with TpVertex {
 
   val innerId = id.innerId
 
@@ -97,6 +103,22 @@ case class Vertex(id: VertexId,
     else
       Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, columnName).mkString("\t")
   }
+
+  override def vertices(direction: Direction, strings: String*): util.Iterator[TpVertex] = ???
+
+  override def edges(direction: Direction, strings: String*): util.Iterator[structure.Edge] = ???
+
+  override def property[V](cardinality: Cardinality, s: String, v: V, objects: AnyRef*): VertexProperty[V] = ???
+
+  override def addEdge(s: String, vertex: TpVertex, objects: AnyRef*): Edge = ???
+
+  override def properties[V](strings: String*): util.Iterator[VertexProperty[V]] = ???
+
+  override def remove(): Unit = ???
+
+  override def graph(): Graph = ???
+
+  override def label(): String = ???
 }
 
 object Vertex {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index 8baf787..805a544 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -541,7 +541,7 @@ class RequestParser(graph: Graph) {
     val elementsWithTsv = for {
       edgeStr <- edgeStrs
       str <- GraphUtil.parseString(edgeStr)
-      element <- Graph.toGraphElement(str)
+      element <- graph.toGraphElement(str)
     } yield (element, str)
 
     elementsWithTsv
@@ -566,7 +566,7 @@ class RequestParser(graph: Graph) {
       tgtId <- tgtIds.flatMap(jsValueToAny(_).toSeq)
     } yield {
       //      val edge = Management.toEdge(graph, timestamp, operation, srcId, tgtId, label, direction, fromJsonToProperties(propsJson))
-      val edge = Edge.toEdge(srcId, tgtId, label, direction, fromJsonToProperties(propsJson), ts = timestamp, operation = operation)
+      val edge = graph.toEdge(srcId, tgtId, label, direction, fromJsonToProperties(propsJson), ts = timestamp, operation = operation)
       val tsv = (jsValue \ "direction").asOpt[String] match {
         case None => Seq(timestamp, operation, "e", srcId, tgtId, label, propsJson.toString).mkString("\t")
         case Some(dir) => Seq(timestamp, operation, "e", srcId, tgtId, label, propsJson.toString, dir).mkString("\t")
@@ -690,7 +690,7 @@ class RequestParser(graph: Graph) {
       labelName <- (json \ "label").asOpt[String]
       direction = (json \ "direction").asOpt[String].getOrElse("out")
     } yield {
-      Edge.toEdge(from, to, labelName, direction, Map.empty)
+      graph.toEdge(from, to, labelName, direction, Map.empty)
     }
   }
 
@@ -700,7 +700,7 @@ class RequestParser(graph: Graph) {
     for {
       edgeStr <- edgeStrs
       str <- GraphUtil.parseString(edgeStr)
-      element <- Graph.toGraphElement(str)
+      element <- graph.toGraphElement(str)
     } yield element
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index b1ef11d..26d6ad1 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -113,20 +113,20 @@ abstract class Storage[Q, R](val graph: Graph,
    * */
 
   val snapshotEdgeDeserializers: Map[String, Deserializable[SnapshotEdge]] = Map(
-    VERSION1 -> new SnapshotEdgeDeserializable,
-    VERSION2 -> new SnapshotEdgeDeserializable,
-    VERSION3 -> new serde.snapshotedge.tall.SnapshotEdgeDeserializable,
-    VERSION4 -> new serde.snapshotedge.tall.SnapshotEdgeDeserializable
+    VERSION1 -> new SnapshotEdgeDeserializable(graph),
+    VERSION2 -> new SnapshotEdgeDeserializable(graph),
+    VERSION3 -> new serde.snapshotedge.tall.SnapshotEdgeDeserializable(graph),
+    VERSION4 -> new serde.snapshotedge.tall.SnapshotEdgeDeserializable(graph)
   )
   def snapshotEdgeDeserializer(schemaVer: String) =
     snapshotEdgeDeserializers.get(schemaVer).getOrElse(throw new RuntimeException(s"not supported version: ${schemaVer}"))
 
   /** create deserializer that can parse stored CanSKeyValue into indexEdge. */
-  val indexEdgeDeserializers: Map[String, Deserializable[IndexEdge]] = Map(
-    VERSION1 -> new IndexEdgeDeserializable,
-    VERSION2 -> new IndexEdgeDeserializable,
-    VERSION3 -> new IndexEdgeDeserializable,
-    VERSION4 -> new serde.indexedge.tall.IndexEdgeDeserializable
+  val indexEdgeDeserializers: Map[String, Deserializable[Edge]] = Map(
+    VERSION1 -> new IndexEdgeDeserializable(graph),
+    VERSION2 -> new IndexEdgeDeserializable(graph),
+    VERSION3 -> new IndexEdgeDeserializable(graph),
+    VERSION4 -> new serde.indexedge.tall.IndexEdgeDeserializable(graph)
   )
 
   def indexEdgeDeserializer(schemaVer: String) =
@@ -795,17 +795,25 @@ abstract class Storage[Q, R](val graph: Graph,
       } yield {
           val edge = edgeWithScore.edge
           val score = edgeWithScore.score
-          /** reverted direction */
-          val reversedIndexedEdgesMutations = edge.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
+
+          val edgeSnapshot = edge.copyEdge(propsWithTs = Edge.propsToState(edge.updatePropsWithTs()))
+          val reversedSnapshotEdgeMutations = snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
+
+          val edgeForward = edge.copyEdge(propsWithTs = Edge.propsToState(edge.updatePropsWithTs()))
+          val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge =>
             indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
               buildIncrementsAsync(indexEdge, -1L)
           }
-          val reversedSnapshotEdgeMutations = snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
-          val forwardIndexedEdgeMutations = edge.edgesWithIndex.flatMap { indexEdge =>
+
+          /** reverted direction */
+          val edgeRevert = edge.copyEdge(propsWithTs = Edge.propsToState(edge.updatePropsWithTs()))
+          val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
             indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
               buildIncrementsAsync(indexEdge, -1L)
           }
+
           val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
+
           writeToStorage(zkQuorum, mutations, withWait = true)
         }
 
@@ -821,7 +829,7 @@ abstract class Storage[Q, R](val graph: Graph,
   /** Parsing Logic: parse from kv from Storage into Edge */
   def toEdge[K: CanSKeyValue](kv: K,
                               queryRequest: QueryRequest,
-                              cacheElementOpt: Option[IndexEdge],
+                              cacheElementOpt: Option[Edge],
                               parentEdges: Seq[EdgeWithScore]): Option[Edge] = {
     logger.debug(s"toEdge: $kv")
 
@@ -830,8 +838,8 @@ abstract class Storage[Q, R](val graph: Graph,
       val queryParam = queryRequest.queryParam
       val schemaVer = queryParam.label.schemaVersion
       val indexEdgeOpt = indexEdgeDeserializer(schemaVer).fromKeyValues(Option(queryParam.label), Seq(kv), queryParam.label.schemaVersion, cacheElementOpt)
-      if (!queryOption.returnTree) indexEdgeOpt.map(indexEdge => indexEdge.toEdge.copy(parentEdges = parentEdges))
-      else indexEdgeOpt.map(indexEdge => indexEdge.toEdge)
+      if (!queryOption.returnTree) indexEdgeOpt.map(indexEdge => indexEdge.copy(parentEdges = parentEdges))
+      else indexEdgeOpt
     } catch {
       case ex: Exception =>
         logger.error(s"Fail on toEdge: ${kv.toString}, ${queryRequest}", ex)
@@ -898,7 +906,7 @@ abstract class Storage[Q, R](val graph: Graph,
       val (degreeEdges, keyValues) = cacheElementOpt match {
         case None => (Nil, kvs)
         case Some(cacheElement) =>
-          val head = cacheElement.toEdge
+          val head = cacheElement
           if (!head.isDegree) (Nil, kvs)
           else (Seq(EdgeWithScore(head, 1.0, label)), kvs.tail)
       }
@@ -968,13 +976,13 @@ abstract class Storage[Q, R](val graph: Graph,
         val (srcVId, tgtVId) = (SourceVertexId(srcColumn.id.get, src), TargetVertexId(tgtColumn.id.get, tgt))
         val (srcV, tgtV) = (Vertex(srcVId), Vertex(tgtVId))
 
-        Edge(srcV, tgtV, label, labelWithDir.dir, propsWithTs = propsWithTs)
+        graph.newEdge(srcV, tgtV, label, labelWithDir.dir, propsWithTs = propsWithTs)
       case None =>
         val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion)
         val srcVId = SourceVertexId(srcColumn.id.get, src)
         val srcV = Vertex(srcVId)
 
-        Edge(srcV, srcV, label, labelWithDir.dir, propsWithTs = propsWithTs, parentEdges = parentEdges)
+        graph.newEdge(srcV, srcV, label, labelWithDir.dir, propsWithTs = propsWithTs, parentEdges = parentEdges)
     }
   }
 
@@ -1075,13 +1083,15 @@ abstract class Storage[Q, R](val graph: Graph,
 
   /** IndexEdge */
   def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = {
-    val newProps = indexedEdge.updatePropsWithTs(Map(LabelMeta.degree -> InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer)))
+    val newProps = indexedEdge.updatePropsWithTs()
+    newProps.put(LabelMeta.degree.name, new S2Property(indexedEdge.toEdge, LabelMeta.degree, LabelMeta.degree.name, amount, indexedEdge.ts))
     val _indexedEdge = indexedEdge.copy(propsWithTs = newProps)
     indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment, durability = _indexedEdge.label.durability))
   }
 
   def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = {
-    val newProps = indexedEdge.updatePropsWithTs(Map(LabelMeta.count -> InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer)))
+    val newProps = indexedEdge.updatePropsWithTs()
+    newProps.put(LabelMeta.degree.name, new S2Property(indexedEdge.toEdge, LabelMeta.degree, LabelMeta.degree.name, amount, indexedEdge.ts))
     val _indexedEdge = indexedEdge.copy(propsWithTs = newProps)
     indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment, durability = _indexedEdge.label.durability))
   }
@@ -1109,10 +1119,8 @@ abstract class Storage[Q, R](val graph: Graph,
   }
 
   def buildDegreePuts(edge: Edge, degreeVal: Long): Seq[SKeyValue] = {
-    val kvs = edge.edgesWithIndexValid.flatMap { _indexEdge =>
-      val newProps = Map(LabelMeta.degree -> InnerValLikeWithTs.withLong(degreeVal, _indexEdge.ts, _indexEdge.schemaVer))
-      val indexEdge = _indexEdge.copy(propsWithTs = newProps)
-
+    edge.property(LabelMeta.degree.name, degreeVal, edge.ts)
+    val kvs = edge.edgesWithIndexValid.flatMap { indexEdge =>
       indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put, durability = indexEdge.label.durability))
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
index 2428173..c538e53 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -25,13 +25,14 @@ import org.apache.s2graph.core.storage.StorageDeserializable._
 import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, StorageDeserializable}
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.utils.logger
-import org.apache.s2graph.core.{GraphUtil, IndexEdge, Vertex}
+import org.apache.s2graph.core._
 import scala.collection.immutable
 
 object IndexEdgeDeserializable{
-  def getNewInstance() = new IndexEdgeDeserializable()
+  def getNewInstance(graph: Graph) = new IndexEdgeDeserializable(graph)
 }
-class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] {
+class IndexEdgeDeserializable(graph: Graph,
+                              bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[Edge] {
    import StorageDeserializable._
 
    type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int)
@@ -40,7 +41,7 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
    override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
                                                     _kvs: Seq[T],
                                                     schemaVer: String,
-                                                    cacheElementOpt: Option[IndexEdge]): IndexEdge = {
+                                                    cacheElementOpt: Option[Edge]): Edge = {
 
      assert(_kvs.size == 1)
 
@@ -59,19 +60,25 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
      pos += 1
 
      val label = checkLabel.getOrElse(Label.findById(labelWithDir.labelId))
-//     val op = kv.row(pos)
-//     pos += 1
+
+     val srcVertex = Vertex(srcVertexId, version)
+     //TODO:
+     val edge = graph.newEdge(srcVertex, null,
+       label, labelWithDir.dir, GraphUtil.defaultOpByte, version, Edge.EmptyState)
+     var tsVal = version
 
      if (pos == kv.row.length) {
        // degree
        //      val degreeVal = Bytes.toLong(kv.value)
        val degreeVal = bytesToLongFunc(kv.value, 0)
-       val ts = kv.timestamp
-       val tsInnerValLikeWithTs = InnerValLikeWithTs.withLong(ts, ts, schemaVer)
-       val props = Map(LabelMeta.timestamp -> tsInnerValLikeWithTs,
-         LabelMeta.degree -> InnerValLikeWithTs.withLong(degreeVal, ts, schemaVer))
        val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer))
-       IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), label, labelWithDir.dir, GraphUtil.defaultOpByte, ts, labelIdxSeq, props,  tsInnerValOpt = Option(tsInnerValLikeWithTs.innerVal))
+
+       edge.property(LabelMeta.timestamp.name, version, version)
+       edge.property(LabelMeta.degree.name, degreeVal, version)
+       edge.tgtVertex = Vertex(tgtVertexId, version)
+       edge.op = GraphUtil.defaultOpByte
+       edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
+       edge
      } else {
        // not degree edge
        val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, schemaVer)
@@ -85,60 +92,47 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
        }
        val op = kv.row(kv.row.length-1)
 
-       val allProps = immutable.Map.newBuilder[LabelMeta, InnerValLikeWithTs]
        val index = label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}"))
-
        /** process indexProps */
        val size = idxPropsRaw.length
        (0 until size).foreach { ith =>
          val meta = index.sortKeyTypesArray(ith)
          val (k, v) = idxPropsRaw(ith)
-         if (k == LabelMeta.degree) allProps += LabelMeta.degree -> InnerValLikeWithTs(v, version)
-         else allProps += meta -> InnerValLikeWithTs(v, version)
+         if (k == LabelMeta.timestamp) tsVal = v.value.asInstanceOf[BigDecimal].longValue()
+
+         if (k == LabelMeta.degree) {
+           edge.property(LabelMeta.degree.name, v.value, version)
+         } else {
+           edge.property(meta.name, v.value, version)
+         }
        }
-//       for {
-//         (meta, (k, v)) <- index.sortKeyTypes.zip(idxPropsRaw)
-//       } {
-//         if (k == LabelMeta.degree) allProps += LabelMeta.degree -> InnerValLikeWithTs(v, version)
-//         else {
-//           allProps += meta -> InnerValLikeWithTs(v, version)
-//         }
-//       }
 
        /** process props */
        if (op == GraphUtil.operations("incrementCount")) {
          //        val countVal = Bytes.toLong(kv.value)
          val countVal = bytesToLongFunc(kv.value, 0)
-         allProps += (LabelMeta.count -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
+         edge.property(LabelMeta.count.name, countVal, version)
        } else {
          val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer, label)
          props.foreach { case (k, v) =>
-           allProps += (k -> InnerValLikeWithTs(v, version))
+           if (k == LabelMeta.timestamp) tsVal = v.value.asInstanceOf[BigDecimal].longValue()
+
+           edge.property(k.name, v.value, version)
          }
        }
-       val _mergedProps = allProps.result()
-       val (mergedProps, tsInnerValLikeWithTs) = _mergedProps.get(LabelMeta.timestamp) match {
-         case None =>
-           val tsInnerVal = InnerValLikeWithTs.withLong(version, version, schemaVer)
-           val mergedProps = _mergedProps + (LabelMeta.timestamp -> InnerValLikeWithTs.withLong(version, version, schemaVer))
-           (mergedProps, tsInnerVal)
-         case Some(tsInnerVal) =>
-           (_mergedProps, tsInnerVal)
-       }
-//       val mergedProps =
-//         if (_mergedProps.contains(LabelMeta.timestamp)) _mergedProps
-//         else _mergedProps + (LabelMeta.timestamp -> InnerValLikeWithTs.withLong(version, version, schemaVer))
 
        /** process tgtVertexId */
        val tgtVertexId =
-         mergedProps.get(LabelMeta.to) match {
-           case None => tgtVertexIdRaw
-           case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
-         }
-
+         if (edge.checkProperty(LabelMeta.to.name)) {
+           val vId = edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs
+           TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
+         } else tgtVertexIdRaw
 
-       IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), label, labelWithDir.dir, op, version, labelIdxSeq, mergedProps, tsInnerValOpt = Option(tsInnerValLikeWithTs.innerVal))
 
+       edge.tgtVertex = Vertex(tgtVertexId, version)
+       edge.op = op
+       edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
+       edge
      }
    }
  }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
index 534667b..2b620a1 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -23,10 +23,11 @@ import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
 import org.apache.s2graph.core.storage.StorageDeserializable._
 import org.apache.s2graph.core.storage._
 import org.apache.s2graph.core.types._
-import org.apache.s2graph.core.{GraphUtil, IndexEdge, Vertex}
+import org.apache.s2graph.core._
 import scala.collection.immutable
 
-class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] {
+class IndexEdgeDeserializable(graph: Graph,
+                              bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[Edge] {
    import StorageDeserializable._
 
    type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int)
@@ -67,7 +68,7 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
    override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
                                                     _kvs: Seq[T],
                                                     schemaVer: String,
-                                                    cacheElementOpt: Option[IndexEdge]): IndexEdge = {
+                                                    cacheElementOpt: Option[Edge]): Edge = {
      assert(_kvs.size == 1)
 
 //     val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
@@ -75,17 +76,22 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
      val kv = implicitly[CanSKeyValue[T]].toSKeyValue(_kvs.head)
      val version = kv.timestamp
 
-     val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e =>
-       (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0)
-     }.getOrElse(parseRow(kv, schemaVer))
+//     val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e =>
+//       (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0)
+//     }.getOrElse(parseRow(kv, schemaVer))
+     val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = parseRow(kv, schemaVer)
 
      val label = checkLabel.getOrElse(Label.findById(labelWithDir.labelId))
+     val srcVertex = Vertex(srcVertexId, version)
+     //TODO:
+     val edge = graph.newEdge(srcVertex, null,
+       label, labelWithDir.dir, GraphUtil.defaultOpByte, version, Edge.EmptyState)
+     var tsVal = version
 
      val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) =
        if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, schemaVer)
        else parseQualifier(kv, schemaVer)
 
-     val allProps = immutable.Map.newBuilder[LabelMeta, InnerValLikeWithTs]
      val index = label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}"))
 
      /** process indexProps */
@@ -93,52 +99,38 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
      (0 until size).foreach { ith =>
        val meta = index.sortKeyTypesArray(ith)
        val (k, v) = idxPropsRaw(ith)
-       if (k == LabelMeta.degree) allProps += LabelMeta.degree -> InnerValLikeWithTs(v, version)
-       else allProps += meta -> InnerValLikeWithTs(v, version)
+       if (k == LabelMeta.timestamp) tsVal = v.value.asInstanceOf[BigDecimal].longValue()
+
+       if (k == LabelMeta.degree) {
+         edge.property(LabelMeta.degree.name, v.value, version)
+       } else {
+         edge.property(meta.name, v.value, version)
+       }
      }
-//     for {
-//       (seq, (k, v)) <- index.sortKeyTypes.zip(idxPropsRaw)
-//     } {
-//       if (k == LabelMeta.degree) allProps += LabelMeta.degree -> InnerValLikeWithTs(v, version)
-//       else allProps += seq -> InnerValLikeWithTs(v, version)
-//     }
 
      /** process props */
      if (op == GraphUtil.operations("incrementCount")) {
-       //      val countVal = Bytes.toLong(kv.value)
+       //        val countVal = Bytes.toLong(kv.value)
        val countVal = bytesToLongFunc(kv.value, 0)
-       allProps += (LabelMeta.count -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
-     } else if (kv.qualifier.isEmpty) {
-       val countVal = bytesToLongFunc(kv.value, 0)
-       allProps += (LabelMeta.degree -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
+       edge.property(LabelMeta.count.name, countVal, version)
      } else {
-       val (props, _) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer, label)
+       val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer, label)
        props.foreach { case (k, v) =>
-         allProps += (k -> InnerValLikeWithTs(v, version))
-       }
-     }
+         if (k == LabelMeta.timestamp) tsVal = v.value.asInstanceOf[BigDecimal].longValue()
 
-     val _mergedProps = allProps.result()
-     val (mergedProps, tsInnerValLikeWithTs) = _mergedProps.get(LabelMeta.timestamp) match {
-       case None =>
-         val tsInnerVal = InnerValLikeWithTs.withLong(version, version, schemaVer)
-         val mergedProps = _mergedProps + (LabelMeta.timestamp -> InnerValLikeWithTs.withLong(version, version, schemaVer))
-         (mergedProps, tsInnerVal)
-       case Some(tsInnerVal) =>
-         (_mergedProps, tsInnerVal)
+         edge.property(k.name, v.value, version)
+       }
      }
-//     val mergedProps =
-//       if (_mergedProps.contains(LabelMeta.timestamp)) _mergedProps
-//            else _mergedProps + (LabelMeta.timestamp -> InnerValLikeWithTs.withLong(version, version, schemaVer))
-
      /** process tgtVertexId */
      val tgtVertexId =
-       mergedProps.get(LabelMeta.to) match {
-         case None => tgtVertexIdRaw
-         case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
-       }
-
-     IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), label, labelWithDir.dir, op, version, labelIdxSeq, mergedProps, tsInnerValOpt = Option(tsInnerValLikeWithTs.innerVal))
-
+       if (edge.checkProperty(LabelMeta.to.name)) {
+         val vId = edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs
+         TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
+       } else tgtVertexIdRaw
+
+     edge.tgtVertex = Vertex(tgtVertexId, version)
+     edge.op = op
+     edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
+     edge
    }
  }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
index 91b8db1..37aafcf 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
@@ -24,9 +24,9 @@ import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
 import org.apache.s2graph.core.storage.StorageDeserializable._
 import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, StorageDeserializable}
 import org.apache.s2graph.core.types.{HBaseType, LabelWithDirection, SourceAndTargetVertexIdPair, SourceVertexId}
-import org.apache.s2graph.core.{Edge, SnapshotEdge, Vertex}
+import org.apache.s2graph.core.{Graph, Edge, SnapshotEdge, Vertex}
 
-class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
+class SnapshotEdgeDeserializable(graph: Graph) extends Deserializable[SnapshotEdge] {
 
   def statusCodeWithOp(byte: Byte): (Byte, Byte) = {
     val statusCode = byte >> 4
@@ -87,7 +87,7 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
           val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
 
           val pendingEdge =
-            Edge(Vertex(srcVertexId, cellVersion),
+            graph.newEdge(Vertex(srcVertexId, cellVersion),
               Vertex(tgtVertexId, cellVersion),
               label, labelWithDir.dir, pendingEdgeOp,
               cellVersion, pendingEdgeProps.toMap,
@@ -98,7 +98,7 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
       (kvsMap, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal)
     }
 
-    SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts),
+    graph.newSnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts),
       label, labelWithDir.dir, op, cellVersion, props, statusCode = statusCode,
       pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal))
   }