You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2017/11/19 02:28:47 UTC

[04/23] incubator-s2graph git commit: passed s2tests.

passed s2tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/87394b9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/87394b9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/87394b9f

Branch: refs/heads/master
Commit: 87394b9f7b7e241642201460a460ba2403a0fb99
Parents: 7413aad
Author: DO YUNG YOON <st...@apache.org>
Authored: Fri Nov 3 15:28:08 2017 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Fri Nov 3 15:28:08 2017 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/core/QueryParam.scala    | 10 +--
 .../org/apache/s2graph/core/QueryResult.scala   |  2 +-
 .../scala/org/apache/s2graph/core/S2Edge.scala  | 68 +++++++++-----------
 .../org/apache/s2graph/core/S2EdgeLike.scala    | 54 +++++++++++++---
 .../scala/org/apache/s2graph/core/S2Graph.scala | 50 +++++++-------
 .../org/apache/s2graph/core/S2Property.scala    |  2 +-
 .../org/apache/s2graph/core/S2VertexLike.scala  |  2 +-
 .../s2graph/core/index/IndexProvider.scala      | 12 ++--
 .../s2graph/core/parsers/WhereParser.scala      | 34 +++++-----
 .../s2graph/core/rest/RequestParser.scala       |  4 +-
 .../apache/s2graph/core/storage/Storage.scala   | 10 +--
 .../apache/s2graph/core/storage/StorageIO.scala | 20 +++---
 .../s2graph/core/storage/StorageReadable.scala  |  6 +-
 .../storage/WriteWriteConflictResolver.scala    | 34 +++++-----
 .../hbase/AsynchbaseStorageReadable.scala       |  6 +-
 .../tall/IndexEdgeDeserializable.scala          | 17 +++--
 .../wide/IndexEdgeDeserializable.scala          | 12 ++--
 .../tall/SnapshotEdgeSerializable.scala         |  2 +-
 .../wide/SnapshotEdgeSerializable.scala         |  2 +-
 .../s2graph/core/parsers/WhereParserTest.scala  |  2 +-
 .../core/tinkerpop/S2GraphProvider.scala        |  1 +
 .../rest/play/controllers/EdgeController.scala  |  6 +-
 22 files changed, 195 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
index 2e8d1f4..e98ef37 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -163,7 +163,7 @@ case class EdgeTransformer(jsValue: JsValue) {
     }
   }
 
-  def toInnerValOpt(queryParam: QueryParam, edge: S2Edge, fieldName: String): Option[InnerValLike] = {
+  def toInnerValOpt(queryParam: QueryParam, edge: S2EdgeLike, fieldName: String): Option[InnerValLike] = {
     fieldName match {
       case LabelMeta.to.name => Option(edge.tgtVertex.innerId)
       case LabelMeta.from.name => Option(edge.srcVertex.innerId)
@@ -171,7 +171,7 @@ case class EdgeTransformer(jsValue: JsValue) {
     }
   }
 
-  def transform(queryParam: QueryParam, edge: S2Edge, nextStepOpt: Option[Step]): Seq[S2Edge] = {
+  def transform(queryParam: QueryParam, edge: S2EdgeLike, nextStepOpt: Option[Step]): Seq[S2EdgeLike] = {
     if (isDefault) Seq(edge)
     else {
       val edges = for {
@@ -311,7 +311,7 @@ case class QueryParam(labelName: String,
     CanInnerValLike.anyToInnerValLike.toInnerVal(id)(label.tgtColumnWithDir(dir).schemaVersion)
   }
 
-  def buildInterval(edgeOpt: Option[S2Edge]) = intervalOpt match {
+  def buildInterval(edgeOpt: Option[S2EdgeLike]) = intervalOpt match {
     case None => Array.empty[Byte] -> Array.empty[Byte]
     case Some(interval) =>
       val (froms, tos) = interval
@@ -359,7 +359,7 @@ case class QueryParam(labelName: String,
     Bytes.add(bytes, optionalCacheKey)
   }
 
-  private def convertToInner(kvs: Seq[(String, JsValue)], edgeOpt: Option[S2Edge]): Seq[(LabelMeta, InnerValLike)] = {
+  private def convertToInner(kvs: Seq[(String, JsValue)], edgeOpt: Option[S2EdgeLike]): Seq[(LabelMeta, InnerValLike)] = {
     kvs.map { case (propKey, propValJs) =>
       propValJs match {
         case JsString(in) if edgeOpt.isDefined && in.contains("_parent.") =>
@@ -392,7 +392,7 @@ case class QueryParam(labelName: String,
     }
   }
 
-  def paddingInterval(len: Byte, froms: Seq[(String, JsValue)], tos: Seq[(String, JsValue)], edgeOpt: Option[S2Edge] = None) = {
+  def paddingInterval(len: Byte, froms: Seq[(String, JsValue)], tos: Seq[(String, JsValue)], edgeOpt: Option[S2EdgeLike] = None) = {
     val fromInnerVal = convertToInner(froms, edgeOpt)
     val toInnerVal = convertToInner(tos, edgeOpt)
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
index 7506b40..9bd3cdb 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
@@ -91,7 +91,7 @@ object WithScore {
   }
 }
 
-case class EdgeWithScore(edge: S2Edge,
+case class EdgeWithScore(edge: S2EdgeLike,
                          score: Double,
                          label: Label,
                          orderByValues: (Any, Any, Any, Any) = StepResult.EmptyOrderByValues,

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
index 97abd26..3529991 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
@@ -71,7 +71,7 @@ case class SnapshotEdge(graph: S2Graph,
                         op: Byte,
                         version: Long,
                         private val propsWithTs: Props,
-                        pendingEdgeOpt: Option[S2Edge],
+                        pendingEdgeOpt: Option[S2EdgeLike],
                         statusCode: Byte = 0,
                         lockTs: Option[Long],
                         tsInnerValOpt: Option[InnerValLike] = None) {
@@ -89,7 +89,7 @@ case class SnapshotEdge(graph: S2Graph,
 
   def allPropsDeleted = S2Edge.allPropsDeleted(propsWithTs)
 
-  def toEdge: S2Edge = {
+  def toEdge: S2EdgeLike = {
     S2Edge(graph, srcVertex, tgtVertex, label, dir, op,
       version, propsWithTs, pendingEdgeOpt = pendingEdgeOpt,
       statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt)
@@ -243,7 +243,7 @@ case class IndexEdge(graph: S2Graph,
   } yield meta.name -> jsValue
 
 
-  def toEdge: S2Edge = S2Edge(graph, srcVertex, tgtVertex, label, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
+  def toEdge: S2EdgeLike = S2Edge(graph, srcVertex, tgtVertex, label, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
 
   // only for debug
   def toLogString() = {
@@ -310,11 +310,11 @@ case class S2Edge(override val innerGraph: S2Graph,
                   var version: Long = System.currentTimeMillis(),
                   override val propsWithTs: Props = S2Edge.EmptyProps,
                   override val parentEdges: Seq[EdgeWithScore] = Nil,
-                  override val originalEdgeOpt: Option[S2Edge] = None,
-                  override val pendingEdgeOpt: Option[S2Edge] = None,
+                  override val originalEdgeOpt: Option[S2EdgeLike] = None,
+                  override val pendingEdgeOpt: Option[S2EdgeLike] = None,
                   override val statusCode: Byte = 0,
                   override val lockTs: Option[Long] = None,
-                  var tsInnerValOpt: Option[InnerValLike] = None) extends S2EdgeLike with GraphElement {
+                  var tsInnerValOpt: Option[InnerValLike] = None) extends S2EdgeLike {
 
   //  if (!props.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.")
   //  assert(propsWithTs.contains(LabelMeta.timeStampSeq))
@@ -335,10 +335,10 @@ case class S2Edge(override val innerGraph: S2Graph,
   //  }
 
 
-  def toLogString: String = {
-    //    val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj())
-    List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, propsWithTs).mkString("\t")
-  }
+//  def toLogString: String = {
+//    //    val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj())
+//    List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, propsWithTs).mkString("\t")
+//  }
 
   override def hashCode(): Int = {
     id().hashCode()
@@ -455,7 +455,7 @@ object S2Edge {
   type State = Map[LabelMeta, InnerValLikeWithTs]
   type PropsPairWithTs = (State, State, Long, String)
   type MergeState = PropsPairWithTs => (State, Boolean)
-  type UpdateFunc = (Option[S2Edge], S2Edge, MergeState)
+  type UpdateFunc = (Option[S2EdgeLike], S2EdgeLike, MergeState)
 
   def EmptyProps = new java.util.HashMap[String, S2Property[_]]
   def EmptyState = Map.empty[LabelMeta, InnerValLikeWithTs]
@@ -490,7 +490,7 @@ object S2Edge {
   def fillPropsWithTs(indexEdge: IndexEdge, state: State): Unit = {
     state.foreach { case (k, v) => indexEdge.property(k.name, v.innerVal.value, v.ts) }
   }
-  def fillPropsWithTs(edge: S2Edge, state: State): Unit = {
+  def fillPropsWithTs(edge: S2EdgeLike, state: State): Unit = {
     state.foreach { case (k, v) => edge.propertyInner(k.name, v.innerVal.value, v.ts) }
   }
 
@@ -500,7 +500,7 @@ object S2Edge {
     }.toMap
   }
 
-  def stateToProps(edge: S2Edge, state: State): Props = {
+  def stateToProps(edge: S2EdgeLike, state: State): Props = {
     state.foreach { case (k, v) =>
       edge.propertyInner(k.name, v.innerVal.value, v.ts)
     }
@@ -533,7 +533,7 @@ object S2Edge {
       ret
     }
 
-  def buildDeleteBulk(invertedEdge: Option[S2Edge], requestEdge: S2Edge): (S2Edge, EdgeMutate) = {
+  def buildDeleteBulk(invertedEdge: Option[S2EdgeLike], requestEdge: S2EdgeLike): (S2EdgeLike, EdgeMutate) = {
     //    assert(invertedEdge.isEmpty)
     //    assert(requestEdge.op == GraphUtil.operations("delete"))
 
@@ -543,7 +543,7 @@ object S2Edge {
     (requestEdge, EdgeMutate(edgesToDelete, edgesToInsert = Nil, newSnapshotEdge = edgeInverted))
   }
 
-  def buildOperation(invertedEdge: Option[S2Edge], requestEdges: Seq[S2Edge]): (S2Edge, EdgeMutate) = {
+  def buildOperation(invertedEdge: Option[S2EdgeLike], requestEdges: Seq[S2EdgeLike]): (S2EdgeLike, EdgeMutate) = {
     //            logger.debug(s"oldEdge: ${invertedEdge.map(_.toStringRaw)}")
     //            logger.debug(s"requestEdge: ${requestEdge.toStringRaw}")
     val oldPropsWithTs =
@@ -551,21 +551,21 @@ object S2Edge {
       else propsToState(invertedEdge.get.propsWithTs)
 
     val funcs = requestEdges.map { edge =>
-      if (edge.op == GraphUtil.operations("insert")) {
+      if (edge.getOp() == GraphUtil.operations("insert")) {
         edge.innerLabel.consistencyLevel match {
           case "strong" => S2Edge.mergeUpsert _
           case _ => S2Edge.mergeInsertBulk _
         }
-      } else if (edge.op == GraphUtil.operations("insertBulk")) {
+      } else if (edge.getOp() == GraphUtil.operations("insertBulk")) {
         S2Edge.mergeInsertBulk _
-      } else if (edge.op == GraphUtil.operations("delete")) {
+      } else if (edge.getOp() == GraphUtil.operations("delete")) {
         edge.innerLabel.consistencyLevel match {
           case "strong" => S2Edge.mergeDelete _
           case _ => throw new RuntimeException("not supported")
         }
       }
-      else if (edge.op == GraphUtil.operations("update")) S2Edge.mergeUpdate _
-      else if (edge.op == GraphUtil.operations("increment")) S2Edge.mergeIncrement _
+      else if (edge.getOp() == GraphUtil.operations("update")) S2Edge.mergeUpdate _
+      else if (edge.getOp() == GraphUtil.operations("increment")) S2Edge.mergeIncrement _
       else throw new RuntimeException(s"not supported operation on edge: $edge")
     }
 
@@ -587,7 +587,7 @@ object S2Edge {
       }
       val requestTs = requestEdge.ts
       /* version should be monotoniously increasing so our RPC mutation should be applied safely */
-      val newVersion = invertedEdge.map(e => e.version + incrementVersion).getOrElse(requestTs)
+      val newVersion = invertedEdge.map(e => e.getVersion() + incrementVersion).getOrElse(requestTs)
       val maxTs = prevPropsWithTs.map(_._2.ts).max
       val newTs = if (maxTs > requestTs) maxTs else requestTs
       val propsWithTs = prevPropsWithTs ++
@@ -597,14 +597,14 @@ object S2Edge {
 
       //      logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}")
       //      logger.error(s"$propsWithTs")
-      val newEdge = requestEdge.copy(propsWithTs = EmptyProps)
-      fillPropsWithTs(newEdge, propsWithTs)
+      val newEdge =requestEdge.copyEdgeWithState(propsWithTs)
+
       (newEdge, edgeMutate)
     }
   }
 
-  def buildMutation(snapshotEdgeOpt: Option[S2Edge],
-                    requestEdge: S2Edge,
+  def buildMutation(snapshotEdgeOpt: Option[S2EdgeLike],
+                    requestEdge: S2EdgeLike,
                     newVersion: Long,
                     oldPropsWithTs: Map[LabelMeta, InnerValLikeWithTs],
                     newPropsWithTs: Map[LabelMeta, InnerValLikeWithTs]): EdgeMutate = {
@@ -615,14 +615,14 @@ object S2Edge {
     } else {
       val withOutDeletedAt = newPropsWithTs.filter(kv => kv._1 != LabelMeta.lastDeletedAtSeq)
       val newOp = snapshotEdgeOpt match {
-        case None => requestEdge.op
+        case None => requestEdge.getOp()
         case Some(old) =>
           val oldMaxTs = old.propsWithTs.asScala.map(_._2.ts).max
-          if (oldMaxTs > requestEdge.ts) old.op
-          else requestEdge.op
+          if (oldMaxTs > requestEdge.ts) old.getOp()
+          else requestEdge.getOp()
       }
 
-      val newSnapshotEdge = requestEdge.copy(op = newOp, version = newVersion).copyEdgeWithState(newPropsWithTs)
+      val newSnapshotEdge = requestEdge.copyOp(newOp).copyVersion(newVersion).copyEdgeWithState(newPropsWithTs)
 
       val newSnapshotEdgeOpt = Option(newSnapshotEdge.toSnapshotEdge)
       // delete request must always update snapshot.
@@ -631,8 +631,8 @@ object S2Edge {
         EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = newSnapshotEdgeOpt)
       } else {
         val edgesToDelete = snapshotEdgeOpt match {
-          case Some(snapshotEdge) if snapshotEdge.op != GraphUtil.operations("delete") =>
-            snapshotEdge.copy(op = GraphUtil.defaultOpByte)
+          case Some(snapshotEdge) if snapshotEdge.getOp() != GraphUtil.operations("delete") =>
+            snapshotEdge.copyOp(GraphUtil.defaultOpByte)
               .relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
           case _ => Nil
         }
@@ -640,11 +640,7 @@ object S2Edge {
         val edgesToInsert =
           if (newPropsWithTs.isEmpty || allPropsDeleted(newPropsWithTs)) Nil
           else {
-            val newEdge = requestEdge.copy(
-              version = newVersion,
-              propsWithTs = S2Edge.EmptyProps,
-              op = GraphUtil.defaultOpByte
-            )
+            val newEdge = requestEdge.copyOp(GraphUtil.defaultOpByte).copyVersion(newVersion).copyEdgeWithState(S2Edge.EmptyState)
             newPropsWithTs.foreach { case (k, v) => newEdge.propertyInner(k.name, v.innerVal.value, v.ts) }
 
             newEdge.relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
index 04ed7ab..bb58554 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
@@ -14,7 +14,7 @@ import play.api.libs.json.Json
 import scala.concurrent.Await
 import scala.collection.JavaConverters._
 
-trait S2EdgeLike extends Edge {
+trait S2EdgeLike extends Edge with GraphElement {
   this: S2Edge =>
 
   val innerGraph: S2Graph
@@ -27,8 +27,8 @@ trait S2EdgeLike extends Edge {
 //  var version: Long = System.currentTimeMillis()
   val propsWithTs: Props = S2Edge.EmptyProps
   val parentEdges: Seq[EdgeWithScore] = Nil
-  val originalEdgeOpt: Option[S2Edge] = None
-  val pendingEdgeOpt: Option[S2Edge] = None
+  val originalEdgeOpt: Option[S2EdgeLike] = None
+  val pendingEdgeOpt: Option[S2EdgeLike] = None
   val statusCode: Byte = 0
   val lockTs: Option[Long] = None
 //  var tsInnerValOpt: Option[InnerValLike] = None
@@ -48,6 +48,13 @@ trait S2EdgeLike extends Edge {
   lazy val labelName = innerLabel.label
   lazy val direction = GraphUtil.fromDirection(dir)
 
+  def getOp(): Byte = op
+  def setOp(newOp: Byte): Unit = op = newOp
+  def getVersion(): Long = version
+  def setVersion(newVersion: Long): Unit = version = newVersion
+  def getTsInnerValOpt(): Option[InnerValLike] = tsInnerValOpt
+  def setTsInnerValOpt(newTsInnerValOpt: Option[InnerValLike]): Unit = tsInnerValOpt = newTsInnerValOpt
+
   def toIndexEdge(labelIndexSeq: Byte): IndexEdge = IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelIndexSeq, propsWithTs)
 
   def serializePropsWithTs(): Array[Byte] = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq)
@@ -196,6 +203,11 @@ trait S2EdgeLike extends Edge {
     S2Edge(innerGraph, srcVertex, newTgtVertex, innerLabel, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
   }
 
+  def updateOriginalEdgeOpt(newOriginalEdgeOpt: S2EdgeLike): S2EdgeLike = {
+    S2Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, propsWithTs, parentEdges,
+      Option(newOriginalEdgeOpt), pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
+  }
+
   def rank(r: RankParam): Double =
     if (r.keySeqAndWeights.size <= 0) 1.0f
     else {
@@ -225,12 +237,12 @@ trait S2EdgeLike extends Edge {
                version: Long = version,
                propsWithTs: State = S2Edge.propsToState(this.propsWithTs),
                parentEdges: Seq[EdgeWithScore] = parentEdges,
-               originalEdgeOpt: Option[S2Edge] = originalEdgeOpt,
-               pendingEdgeOpt: Option[S2Edge] = pendingEdgeOpt,
+               originalEdgeOpt: Option[S2EdgeLike] = originalEdgeOpt,
+               pendingEdgeOpt: Option[S2EdgeLike] = pendingEdgeOpt,
                statusCode: Byte = statusCode,
                lockTs: Option[Long] = lockTs,
                tsInnerValOpt: Option[InnerValLike] = tsInnerValOpt,
-               ts: Long = ts): S2Edge = {
+               ts: Long = ts): S2EdgeLike = {
     val edge = new S2Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, S2Edge.EmptyProps,
       parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
     S2Edge.fillPropsWithTs(edge, propsWithTs)
@@ -238,19 +250,39 @@ trait S2EdgeLike extends Edge {
     edge
   }
 
-  def copyEdgeWithState(state: State, ts: Long): S2Edge = {
+  def copyEdgeWithState(state: State, ts: Long): S2EdgeLike = {
     val newEdge = copy(propsWithTs = S2Edge.EmptyProps)
     S2Edge.fillPropsWithTs(newEdge, state)
     newEdge.propertyInner(LabelMeta.timestamp.name, ts, ts)
     newEdge
   }
 
-  def copyEdgeWithState(state: State): S2Edge = {
+  def copyEdgeWithState(state: State): S2EdgeLike = {
     val newEdge = copy(propsWithTs = S2Edge.EmptyProps)
     S2Edge.fillPropsWithTs(newEdge, state)
     newEdge
   }
 
+  def copyOp(newOp: Byte): S2EdgeLike = {
+    copy(op = newOp)
+  }
+
+  def copyVersion(newVersion: Long): S2EdgeLike = {
+    copy(version = newVersion)
+  }
+
+  def copyParentEdges(parents: Seq[EdgeWithScore]): S2EdgeLike = {
+    copy(parentEdges = parents)
+  }
+
+  def copyStatusCode(newStatusCode: Byte): S2EdgeLike = {
+    copy(statusCode = newStatusCode)
+  }
+
+  def copyLockTs(newLockTs: Option[Long]): S2EdgeLike = {
+    copy(lockTs = newLockTs)
+  }
+
   def vertices(direction: Direction): util.Iterator[structure.Vertex] = {
     val arr = new util.ArrayList[Vertex]()
 
@@ -373,7 +405,13 @@ trait S2EdgeLike extends Edge {
     }
   }
 
+  def toLogString: String = {
+    //    val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj())
+    List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, propsWithTs).mkString("\t")
+  }
+
   private def getServiceColumn(vertex: S2VertexLike, defaultServiceColumn: ServiceColumn) =
     if (vertex.id.column == ServiceColumn.Default) defaultServiceColumn else vertex.id.column
 
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 90190cf..8bb95fc 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -201,13 +201,13 @@ object S2Graph {
   }
 
   /** common methods for filter out, transform, aggregate queryResult */
-  def convertEdges(queryParam: QueryParam, edge: S2Edge, nextStepOpt: Option[Step]): Seq[S2Edge] = {
+  def convertEdges(queryParam: QueryParam, edge: S2EdgeLike, nextStepOpt: Option[Step]): Seq[S2EdgeLike] = {
     for {
       convertedEdge <- queryParam.edgeTransformer.transform(queryParam, edge, nextStepOpt) if !edge.isDegree
     } yield convertedEdge
   }
 
-  def processTimeDecay(queryParam: QueryParam, edge: S2Edge) = {
+  def processTimeDecay(queryParam: QueryParam, edge: S2EdgeLike) = {
     /* process time decay */
     val tsVal = queryParam.timeDecay match {
       case None => 1.0
@@ -258,7 +258,7 @@ object S2Graph {
     }
   }
 
-  def toHashKey(queryParam: QueryParam, edge: S2Edge, isDegree: Boolean): (HashKey, FilterHashKey) = {
+  def toHashKey(queryParam: QueryParam, edge: S2EdgeLike, isDegree: Boolean): (HashKey, FilterHashKey) = {
     val src = edge.srcVertex.innerId.hashCode()
     val tgt = edge.tgtVertex.innerId.hashCode()
     val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree)
@@ -435,7 +435,7 @@ object S2Graph {
         val tsVal = processTimeDecay(queryParam, edge)
         val newScore = degreeScore + score
         //          val newEdge = if (queryOption.returnTree) edge.copy(parentEdges = parents) else edge
-        val newEdge = edge.copy(parentEdges = parents)
+        val newEdge = edge.copyParentEdges(parents)
         edgeWithScore.copy(edge = newEdge, score = newScore * labelWeight * tsVal)
       }
 
@@ -971,7 +971,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
 
   def fallback = Future.successful(StepResult.Empty)
 
-  def checkEdges(edges: Seq[S2Edge]): Future[StepResult] = {
+  def checkEdges(edges: Seq[S2EdgeLike]): Future[StepResult] = {
     val futures = for {
       edge <- edges
     } yield {
@@ -1319,11 +1319,11 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
   def mutateElements(elements: Seq[GraphElement],
                      withWait: Boolean = false): Future[Seq[MutateResponse]] = {
 
-    val edgeBuffer = ArrayBuffer[(S2Edge, Int)]()
+    val edgeBuffer = ArrayBuffer[(S2EdgeLike, Int)]()
     val vertexBuffer = ArrayBuffer[(S2VertexLike, Int)]()
 
     elements.zipWithIndex.foreach {
-      case (e: S2Edge, idx: Int) => edgeBuffer.append((e, idx))
+      case (e: S2EdgeLike, idx: Int) => edgeBuffer.append((e, idx))
       case (v: S2VertexLike, idx: Int) => vertexBuffer.append((v, idx))
       case any@_ => logger.error(s"Unknown type: ${any}")
     }
@@ -1347,13 +1347,13 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
 
   //  def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait)
 
-  def mutateEdges(edges: Seq[S2Edge], withWait: Boolean = false): Future[Seq[MutateResponse]] = {
+  def mutateEdges(edges: Seq[S2EdgeLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = {
     val edgeWithIdxs = edges.zipWithIndex
 
     val (strongEdges, weakEdges) =
       edgeWithIdxs.partition { case (edge, idx) =>
         val e = edge
-        e.innerLabel.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk")
+        e.innerLabel.consistencyLevel == "strong" && e.getOp() != GraphUtil.operations("insertBulk")
       }
 
     val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.innerLabel.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) =>
@@ -1365,7 +1365,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
         /* multiple edges with weak consistency level will be processed as batch */
         val mutations = edges.flatMap { edge =>
           val (_, edgeUpdate) =
-            if (edge.op == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
+            if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
             else S2Edge.buildOperation(None, Seq(edge))
 
           val (bufferIncr, nonBufferIncr) = storage.increments(edgeUpdate.deepCopy)
@@ -1380,7 +1380,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
       }
       Future.sequence(futures)
     }
-    val (strongDeleteAll, strongEdgesAll) = strongEdges.partition { case (edge, idx) => edge.op == GraphUtil.operations("deleteAll") }
+    val (strongDeleteAll, strongEdgesAll) = strongEdges.partition { case (edge, idx) => edge.getOp() == GraphUtil.operations("deleteAll") }
 
     val deleteAllFutures = strongDeleteAll.map { case (edge, idx) =>
       deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.innerLabel), edge.labelWithDir.dir, edge.ts).map(idx -> _)
@@ -1404,7 +1404,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
     }
   }
 
-  private def mutateStrongEdges(storage: Storage)(_edges: Seq[S2Edge], withWait: Boolean): Future[Seq[Boolean]] = {
+  private def mutateStrongEdges(storage: Storage)(_edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[Boolean]] = {
 
     val edgeWithIdxs = _edges.zipWithIndex
     val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
@@ -1440,7 +1440,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
   }
 
 
-  private def mutateEdgesInner(storage: Storage)(edges: Seq[S2Edge],
+  private def mutateEdgesInner(storage: Storage)(edges: Seq[S2EdgeLike],
                        checkConsistency: Boolean,
                        withWait: Boolean): Future[MutateResponse] = {
     assert(edges.nonEmpty)
@@ -1495,8 +1495,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
 
 
 
-  def incrementCounts(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[MutateResponse]] = {
-    def incrementCounts(storage: Storage)(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[MutateResponse]] = {
+  def incrementCounts(edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[MutateResponse]] = {
+    def incrementCounts(storage: Storage)(edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[MutateResponse]] = {
       val futures = for {
         edge <- edges
       } yield {
@@ -1523,7 +1523,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
     }
   }
 
-  def updateDegree(edge: S2Edge, degreeVal: Long = 0): Future[MutateResponse] = {
+  def updateDegree(edge: S2EdgeLike, degreeVal: Long = 0): Future[MutateResponse] = {
     val label = edge.innerLabel
 
     val storage = getStorage(label)
@@ -1571,11 +1571,11 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
     toVertex(GraphUtil.split(s))
   }
 
-  def toEdge(s: String): Option[S2Edge] = {
+  def toEdge(s: String): Option[S2EdgeLike] = {
     toEdge(GraphUtil.split(s))
   }
 
-  def toEdge(parts: Array[String]): Option[S2Edge] = Try {
+  def toEdge(parts: Array[String]): Option[S2EdgeLike] = Try {
     val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
     val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
     val tempDirection = if (parts.length >= 8) parts(7) else "out"
@@ -1605,7 +1605,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
              direction: String,
              props: Map[String, Any] = Map.empty,
              ts: Long = System.currentTimeMillis(),
-             operation: String = "insert"): S2Edge = {
+             operation: String = "insert"): S2EdgeLike = {
     val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
 
     val srcColumn = if (direction == "out") label.srcColumn else label.tgtColumn
@@ -1649,7 +1649,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
     vertex
   }
 
-  def toRequestEdge(queryRequest: QueryRequest, parentEdges: Seq[EdgeWithScore]): S2Edge = {
+  def toRequestEdge(queryRequest: QueryRequest, parentEdges: Seq[EdgeWithScore]): S2EdgeLike = {
     val srcVertex = queryRequest.vertex
     val queryParam = queryRequest.queryParam
     val tgtVertexIdOpt = queryParam.tgtVertexInnerIdOpt
@@ -1710,11 +1710,11 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
               version: Long = System.currentTimeMillis(),
               propsWithTs: S2Edge.State,
               parentEdges: Seq[EdgeWithScore] = Nil,
-              originalEdgeOpt: Option[S2Edge] = None,
-              pendingEdgeOpt: Option[S2Edge] = None,
+              originalEdgeOpt: Option[S2EdgeLike] = None,
+              pendingEdgeOpt: Option[S2EdgeLike] = None,
               statusCode: Byte = 0,
               lockTs: Option[Long] = None,
-              tsInnerValOpt: Option[InnerValLike] = None): S2Edge = {
+              tsInnerValOpt: Option[InnerValLike] = None): S2EdgeLike = {
     val edge = S2Edge(
       this,
       srcVertex,
@@ -1758,7 +1758,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
                                     op: Byte,
                                     version: Long,
                                     propsWithTs: S2Edge.State,
-                                    pendingEdgeOpt: Option[S2Edge],
+                                    pendingEdgeOpt: Option[S2EdgeLike],
                                     statusCode: Byte = 0,
                                     lockTs: Option[Long],
                                     tsInnerValOpt: Option[InnerValLike] = None): SnapshotEdge = {
@@ -1874,7 +1874,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
 
   def edgesAsync(edgeIds: AnyRef*): Future[util.Iterator[structure.Edge]] = {
     val s2EdgeIds = edgeIds.collect {
-      case s2Edge: S2Edge => s2Edge.id().asInstanceOf[EdgeId]
+      case s2Edge: S2EdgeLike => s2Edge.id().asInstanceOf[EdgeId]
       case id: EdgeId => id
       case s: String => EdgeId.fromString(s)
     }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
index e86c17f..50b94de 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
@@ -102,7 +102,7 @@ object S2Property {
   }
 }
 
-case class S2Property[V](element: S2Edge,
+case class S2Property[V](element: S2EdgeLike,
                          labelMeta: LabelMeta,
                          key: String,
                          v: V,

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
index 5a8f722..b88c18d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
@@ -71,7 +71,7 @@ trait S2VertexLike extends Vertex with GraphElement {
     val arr = new util.ArrayList[Vertex]()
     edges(direction, edgeLabels: _*).forEachRemaining(new Consumer[Edge] {
       override def accept(edge: Edge): Unit = {
-        val s2Edge = edge.asInstanceOf[S2Edge]
+        val s2Edge = edge.asInstanceOf[S2EdgeLike]
 
         s2Edge.direction match {
           case "out" => arr.add(edge.inVertex())

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
index 098d0b4..e5005b7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
@@ -29,7 +29,7 @@ import org.apache.lucene.queryparser.classic.{ParseException, QueryParser}
 import org.apache.lucene.search.IndexSearcher
 import org.apache.lucene.store.{BaseDirectory, RAMDirectory}
 import org.apache.s2graph.core.io.Conversions
-import org.apache.s2graph.core.{EdgeId, S2Edge, S2Vertex, S2VertexLike}
+import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls._
 import org.apache.s2graph.core.types.{InnerValLike, VertexId}
 import org.apache.s2graph.core.utils.logger
@@ -130,8 +130,8 @@ trait IndexProvider {
   def mutateVertices(vertices: Seq[S2VertexLike]): Seq[Boolean]
   def mutateVerticesAsync(vertices: Seq[S2VertexLike]): Future[Seq[Boolean]]
 
-  def mutateEdges(edges: Seq[S2Edge]): Seq[Boolean]
-  def mutateEdgesAsync(edges: Seq[S2Edge]): Future[Seq[Boolean]]
+  def mutateEdges(edges: Seq[S2EdgeLike]): Seq[Boolean]
+  def mutateEdgesAsync(edges: Seq[S2EdgeLike]): Future[Seq[Boolean]]
 
   def shutdown(): Unit
 }
@@ -179,7 +179,7 @@ class LuceneIndexProvider(config: Config) extends IndexProvider {
     }
   }
 
-  private def toDocument(globalIndex: GlobalIndex, edge: S2Edge): Option[Document] = {
+  private def toDocument(globalIndex: GlobalIndex, edge: S2EdgeLike): Option[Document] = {
     val props = edge.propsWithTs.asScala
     val exist = props.exists(t => globalIndex.propNamesSet(t._1))
     if (!exist) None
@@ -222,7 +222,7 @@ class LuceneIndexProvider(config: Config) extends IndexProvider {
     vertices.map(_ => true)
   }
 
-  override def mutateEdges(edges: Seq[S2Edge]): Seq[Boolean] = {
+  override def mutateEdges(edges: Seq[S2EdgeLike]): Seq[Boolean] = {
     val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.EdgeType)
 
     globalIndexOptions.map { globalIndex =>
@@ -316,5 +316,5 @@ class LuceneIndexProvider(config: Config) extends IndexProvider {
 
   override def mutateVerticesAsync(vertices: Seq[S2VertexLike]): Future[Seq[Boolean]] = Future.successful(mutateVertices(vertices))
 
-  override def mutateEdgesAsync(edges: Seq[S2Edge]): Future[Seq[Boolean]] = Future.successful(mutateEdges(edges))
+  override def mutateEdgesAsync(edges: Seq[S2EdgeLike]): Future[Seq[Boolean]] = Future.successful(mutateEdges(edges))
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
index d754bb7..a0d56b2 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
@@ -22,7 +22,7 @@ package org.apache.s2graph.core.parsers
 import org.apache.s2graph.core.GraphExceptions.{LabelNotExistException, WhereParserException}
 import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
 import org.apache.s2graph.core.types.InnerValLike
-import org.apache.s2graph.core.{S2Edge, GraphUtil}
+import org.apache.s2graph.core.{GraphUtil, S2Edge, S2EdgeLike}
 import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core.utils.logger
 
@@ -33,7 +33,7 @@ import scala.util.parsing.combinator.JavaTokenParsers
 trait ExtractValue {
   val parent = "_parent."
 
-  def propToInnerVal(edge: S2Edge, key: String) = {
+  def propToInnerVal(edge: S2EdgeLike, key: String) = {
     val (propKey, parentEdge) = findParentEdge(edge, key)
 
     val label = parentEdge.innerLabel
@@ -47,7 +47,7 @@ trait ExtractValue {
     }
   }
 
-  def valueToCompare(edge: S2Edge, key: String, value: String) = {
+  def valueToCompare(edge: S2EdgeLike, key: String, value: String) = {
     val label = edge.innerLabel
     if (value.startsWith(parent) || label.metaPropsInvMap.contains(value)) propToInnerVal(edge, value)
     else {
@@ -65,11 +65,11 @@ trait ExtractValue {
   }
 
   @tailrec
-  private def findParent(edge: S2Edge, depth: Int): S2Edge =
+  private def findParent(edge: S2EdgeLike, depth: Int): S2EdgeLike =
     if (depth > 0) findParent(edge.parentEdges.head.edge, depth - 1)
     else edge
 
-  private def findParentEdge(edge: S2Edge, key: String): (String, S2Edge) = {
+  private def findParentEdge(edge: S2EdgeLike, key: String): (String, S2EdgeLike) = {
     if (!key.startsWith(parent)) (key, edge)
     else {
       val split = key.split(parent)
@@ -88,9 +88,9 @@ trait Clause extends ExtractValue {
 
   def or(otherField: Clause): Clause = Or(this, otherField)
 
-  def filter(edge: S2Edge): Boolean
+  def filter(edge: S2EdgeLike): Boolean
 
-  def binaryOp(binOp: (InnerValLike, InnerValLike) => Boolean)(propKey: String, value: String)(edge: S2Edge): Boolean = {
+  def binaryOp(binOp: (InnerValLike, InnerValLike) => Boolean)(propKey: String, value: String)(edge: S2EdgeLike): Boolean = {
     val propValue = propToInnerVal(edge, propKey)
     val compValue = valueToCompare(edge, propKey, value)
 
@@ -105,20 +105,20 @@ object Where {
   }
 }
 case class Where(clauses: Seq[Clause] = Seq.empty[Clause]) {
-  def filter(edge: S2Edge) =
+  def filter(edge: S2EdgeLike) =
     if (clauses.isEmpty) true else clauses.map(_.filter(edge)).forall(identity)
 }
 
 case class Gt(propKey: String, value: String) extends Clause {
-  override def filter(edge: S2Edge): Boolean = binaryOp(_ > _)(propKey, value)(edge)
+  override def filter(edge: S2EdgeLike): Boolean = binaryOp(_ > _)(propKey, value)(edge)
 }
 
 case class Lt(propKey: String, value: String) extends Clause {
-  override def filter(edge: S2Edge): Boolean = binaryOp(_ < _)(propKey, value)(edge)
+  override def filter(edge: S2EdgeLike): Boolean = binaryOp(_ < _)(propKey, value)(edge)
 }
 
 case class Eq(propKey: String, value: String) extends Clause {
-  override def filter(edge: S2Edge): Boolean = binaryOp(_ == _)(propKey, value)(edge)
+  override def filter(edge: S2EdgeLike): Boolean = binaryOp(_ == _)(propKey, value)(edge)
 }
 
 case class InWithoutParent(label: Label, propKey: String, values: Set[String]) extends Clause {
@@ -144,7 +144,7 @@ case class InWithoutParent(label: Label, propKey: String, values: Set[String]) e
     toInnerVal(value, dataType, label.schemaVersion)
   }
 
-  override def filter(edge: S2Edge): Boolean = {
+  override def filter(edge: S2EdgeLike): Boolean = {
     if (edge.dir == GraphUtil.directions("in")) {
       val propVal = propToInnerVal(edge, propKey)
       innerValLikeLsIn.contains(propVal)
@@ -156,7 +156,7 @@ case class InWithoutParent(label: Label, propKey: String, values: Set[String]) e
 }
 
 case class IN(propKey: String, values: Set[String]) extends Clause {
-  override def filter(edge: S2Edge): Boolean = {
+  override def filter(edge: S2EdgeLike): Boolean = {
     val propVal = propToInnerVal(edge, propKey)
     values.exists { value =>
       valueToCompare(edge, propKey, value) == propVal
@@ -165,7 +165,7 @@ case class IN(propKey: String, values: Set[String]) extends Clause {
 }
 
 case class Between(propKey: String, minValue: String, maxValue: String) extends Clause {
-  override def filter(edge: S2Edge): Boolean = {
+  override def filter(edge: S2EdgeLike): Boolean = {
     val propVal = propToInnerVal(edge, propKey)
     val minVal = valueToCompare(edge, propKey, minValue)
     val maxVal = valueToCompare(edge, propKey, maxValue)
@@ -175,15 +175,15 @@ case class Between(propKey: String, minValue: String, maxValue: String) extends
 }
 
 case class Not(self: Clause) extends Clause {
-  override def filter(edge: S2Edge) = !self.filter(edge)
+  override def filter(edge: S2EdgeLike) = !self.filter(edge)
 }
 
 case class And(left: Clause, right: Clause) extends Clause {
-  override def filter(edge: S2Edge) = left.filter(edge) && right.filter(edge)
+  override def filter(edge: S2EdgeLike) = left.filter(edge) && right.filter(edge)
 }
 
 case class Or(left: Clause, right: Clause) extends Clause {
-  override def filter(edge: S2Edge) = left.filter(edge) || right.filter(edge)
+  override def filter(edge: S2EdgeLike) = left.filter(edge) || right.filter(edge)
 }
 
 object WhereParser {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index 55b6e12..6afbd87 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -548,12 +548,12 @@ class RequestParser(graph: S2Graph) {
     elementsWithTsv
   }
 
-  def parseJsonFormat(jsValue: JsValue, operation: String): Seq[(S2Edge, String)] = {
+  def parseJsonFormat(jsValue: JsValue, operation: String): Seq[(S2EdgeLike, String)] = {
     val jsValues = toJsValues(jsValue)
     jsValues.flatMap(toEdgeWithTsv(_, operation))
   }
 
-  private def toEdgeWithTsv(jsValue: JsValue, operation: String): Seq[(S2Edge, String)] = {
+  private def toEdgeWithTsv(jsValue: JsValue, operation: String): Seq[(S2EdgeLike, String)] = {
     val srcIds = (jsValue \ "from").asOpt[JsValue].map(Seq(_)).getOrElse(Nil) ++ (jsValue \ "froms").asOpt[Seq[JsValue]].getOrElse(Nil)
     val tgtIds = (jsValue \ "to").asOpt[JsValue].map(Seq(_)).getOrElse(Nil) ++ (jsValue \ "tos").asOpt[Seq[JsValue]].getOrElse(Nil)
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index 01dd128..e6075ec 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -93,13 +93,13 @@ abstract class Storage(val graph: S2Graph,
   def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] =
     io.buildIncrementsCountAsync(indexedEdge, amount)
 
-  def buildVertexPutsAsync(edge: S2Edge): Seq[SKeyValue] =
+  def buildVertexPutsAsync(edge: S2EdgeLike): Seq[SKeyValue] =
     io.buildVertexPutsAsync(edge)
 
   def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] =
     io.snapshotEdgeMutations(edgeMutate)
 
-  def buildDegreePuts(edge: S2Edge, degreeVal: Long): Seq[SKeyValue] =
+  def buildDegreePuts(edge: S2EdgeLike, degreeVal: Long): Seq[SKeyValue] =
     io.buildDegreePuts(edge, degreeVal)
 
   def buildPutsAll(vertex: S2VertexLike): Seq[SKeyValue] =
@@ -121,15 +121,15 @@ abstract class Storage(val graph: S2Graph,
   def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] =
     fetcher.fetchVertices(vertices)
 
-  def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2Edge]] = fetcher.fetchEdgesAll()
+  def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = fetcher.fetchEdgesAll()
 
   def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = fetcher.fetchVerticesAll()
 
-  def fetchSnapshotEdgeInner(edge: S2Edge)(implicit ec: ExecutionContext): Future[(Option[S2Edge], Option[SKeyValue])] =
+  def fetchSnapshotEdgeInner(edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[(Option[S2EdgeLike], Option[SKeyValue])] =
     fetcher.fetchSnapshotEdgeInner(edge)
 
   /** Conflict Resolver **/
-  def retry(tryNum: Int)(edges: Seq[S2Edge], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2Edge])(implicit ec: ExecutionContext): Future[Boolean] =
+  def retry(tryNum: Int)(edges: Seq[S2EdgeLike], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2EdgeLike])(implicit ec: ExecutionContext): Future[Boolean] =
     conflictResolver.retry(tryNum)(edges, statusCode, fetchedSnapshotEdgeOpt)
 
   /** Management **/

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
index 4014b6d..d0a59b2 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
@@ -32,8 +32,8 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) {
   /** Parsing Logic: parse from kv from Storage into Edge */
   def toEdge[K: CanSKeyValue](kv: K,
                               queryRequest: QueryRequest,
-                              cacheElementOpt: Option[S2Edge],
-                              parentEdges: Seq[EdgeWithScore]): Option[S2Edge] = {
+                              cacheElementOpt: Option[S2EdgeLike],
+                              parentEdges: Seq[EdgeWithScore]): Option[S2EdgeLike] = {
     logger.debug(s"toEdge: $kv")
 
     try {
@@ -41,7 +41,7 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) {
       val queryParam = queryRequest.queryParam
       val schemaVer = queryParam.label.schemaVersion
       val indexEdgeOpt = serDe.indexEdgeDeserializer(schemaVer).fromKeyValues(Seq(kv), cacheElementOpt)
-      if (!queryOption.returnTree) indexEdgeOpt.map(indexEdge => indexEdge.copy(parentEdges = parentEdges))
+      if (!queryOption.returnTree) indexEdgeOpt.map(indexEdge => indexEdge.copyParentEdges(parentEdges))
       else indexEdgeOpt
     } catch {
       case ex: Exception =>
@@ -54,7 +54,7 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) {
                                       queryRequest: QueryRequest,
                                       cacheElementOpt: Option[SnapshotEdge] = None,
                                       isInnerCall: Boolean,
-                                      parentEdges: Seq[EdgeWithScore]): Option[S2Edge] = {
+                                      parentEdges: Seq[EdgeWithScore]): Option[S2EdgeLike] = {
     //        logger.debug(s"SnapshottoEdge: $kv")
     val queryParam = queryRequest.queryParam
     val schemaVer = queryParam.label.schemaVersion
@@ -62,7 +62,7 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) {
 
     if (isInnerCall) {
       snapshotEdgeOpt.flatMap { snapshotEdge =>
-        val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges)
+        val edge = snapshotEdge.toEdge.copyParentEdges(parentEdges)
         if (queryParam.where.map(_.filter(edge)).getOrElse(true)) Option(edge)
         else None
       }
@@ -70,7 +70,7 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) {
       snapshotEdgeOpt.flatMap { snapshotEdge =>
         if (snapshotEdge.allPropsDeleted) None
         else {
-          val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges)
+          val edge = snapshotEdge.toEdge.copyParentEdges(parentEdges)
           if (queryParam.where.map(_.filter(edge)).getOrElse(true)) Option(edge)
           else None
         }
@@ -144,7 +144,7 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) {
           }
           val tsVal = processTimeDecay(queryParam, edge)
           val newScore = degreeScore + score
-          EdgeWithScore(convertedEdge.copy(parentEdges = parentEdges), score = newScore * labelWeight * tsVal, label = label)
+          EdgeWithScore(convertedEdge.copyParentEdges(parentEdges), score = newScore * labelWeight * tsVal, label = label)
         }
 
         val sampled =
@@ -229,11 +229,11 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) {
     }
   }
 
-  def buildVertexPutsAsync(edge: S2Edge): Seq[SKeyValue] = {
+  def buildVertexPutsAsync(edge: S2EdgeLike): Seq[SKeyValue] = {
     val storeVertex = edge.innerLabel.extraOptions.get("storeVertex").map(_.as[Boolean]).getOrElse(false)
 
     if (storeVertex) {
-      if (edge.op == GraphUtil.operations("delete"))
+      if (edge.getOp() == GraphUtil.operations("delete"))
         buildDeleteBelongsToId(edge.srcForVertex) ++ buildDeleteBelongsToId(edge.tgtForVertex)
       else
         serDe.vertexSerializer(edge.srcForVertex).toKeyValues ++ serDe.vertexSerializer(edge.tgtForVertex).toKeyValues
@@ -242,7 +242,7 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) {
     }
   }
 
-  def buildDegreePuts(edge: S2Edge, degreeVal: Long): Seq[SKeyValue] = {
+  def buildDegreePuts(edge: S2EdgeLike, degreeVal: Long): Seq[SKeyValue] = {
     edge.propertyInner(LabelMeta.degree.name, degreeVal, edge.ts)
     val kvs = edge.edgesWithIndexValid.flatMap { indexEdge =>
       serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put, durability = indexEdge.label.durability))

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
index c3b38e8..44bd4dc 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
@@ -39,16 +39,16 @@ trait StorageReadable {
   def fetches(queryRequests: Seq[QueryRequest],
               prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]]
 
-  def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2Edge]]
+  def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]]
 
   def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]]
 
-  protected def fetchKeyValues(queryRequest: QueryRequest, edge: S2Edge)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
+  protected def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
 
   protected def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
 
 
-  def fetchSnapshotEdgeInner(edge: S2Edge)(implicit ec: ExecutionContext): Future[(Option[S2Edge], Option[SKeyValue])] = {
+  def fetchSnapshotEdgeInner(edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[(Option[S2EdgeLike], Option[SKeyValue])] = {
     val queryParam = QueryParam(labelName = edge.innerLabel.label,
       direction = GraphUtil.fromDirection(edge.labelWithDir.dir),
       tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal),

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
index 227cfa7..854fc18 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
@@ -52,7 +52,7 @@ class WriteWriteConflictResolver(graph: S2Graph,
     Random.nextInt(Math.min(BackoffTimeout, slot * Math.pow(2, tryNum)).toInt)
   }
 
-  def retry(tryNum: Int)(edges: Seq[S2Edge], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2Edge])(implicit ec: ExecutionContext): Future[Boolean] = {
+  def retry(tryNum: Int)(edges: Seq[S2EdgeLike], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2EdgeLike])(implicit ec: ExecutionContext): Future[Boolean] = {
     if (tryNum >= MaxRetryNum) {
       edges.foreach { edge =>
         logger.error(s"commit failed after $MaxRetryNum\n${edge.toLogString}")
@@ -112,9 +112,9 @@ class WriteWriteConflictResolver(graph: S2Graph,
     }
   }
 
-  protected def commitUpdate(edges: Seq[S2Edge],
+  protected def commitUpdate(edges: Seq[S2EdgeLike],
                              statusCode: Byte,
-                             fetchedSnapshotEdgeOpt: Option[S2Edge])(implicit ec: ExecutionContext): Future[Boolean] = {
+                             fetchedSnapshotEdgeOpt: Option[S2EdgeLike])(implicit ec: ExecutionContext): Future[Boolean] = {
     //    Future.failed(new PartialFailureException(edges.head, 0, "ahahah"))
     assert(edges.nonEmpty)
     //    assert(statusCode == 0 || fetchedSnapshotEdgeOpt.isDefined)
@@ -135,7 +135,7 @@ class WriteWriteConflictResolver(graph: S2Graph,
             assert(edgeMutate.newSnapshotEdge.isDefined)
 
             val lockTs = Option(System.currentTimeMillis())
-            val pendingEdge = squashedEdge.copy(statusCode = 1, lockTs = lockTs, version = squashedEdge.ts + 1)
+            val pendingEdge = squashedEdge.copyStatusCode(1).copyLockTs(lockTs).copyVersion(squashedEdge.ts + 1)
             val lockSnapshotEdge = squashedEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge))
             val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0,
               pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1)
@@ -158,7 +158,7 @@ class WriteWriteConflictResolver(graph: S2Graph,
                   Future.successful(true)
                 } else {
                   val lockTs = Option(System.currentTimeMillis())
-                  val pendingEdge = squashedEdge.copy(statusCode = 1, lockTs = lockTs, version = snapshotEdge.version + 1)
+                  val pendingEdge = squashedEdge.copyStatusCode(1).copyLockTs(lockTs).copyVersion(snapshotEdge.getVersion() + 1)
                   val lockSnapshotEdge = snapshotEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge))
                   val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0,
                     pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1)
@@ -182,7 +182,7 @@ class WriteWriteConflictResolver(graph: S2Graph,
                     else S2Edge.buildOperation(fetchedSnapshotEdgeOpt, pendingEdge +: edges)
 
                   val lockTs = Option(System.currentTimeMillis())
-                  val newPendingEdge = squashedEdge.copy(statusCode = 1, lockTs = lockTs, version = snapshotEdge.version + 1)
+                  val newPendingEdge = squashedEdge.copyStatusCode(1).copyLockTs(lockTs).copyVersion(snapshotEdge.getVersion() + 1)
                   val lockSnapshotEdge = snapshotEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(newPendingEdge))
                   val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0,
                     pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1)
@@ -222,7 +222,7 @@ class WriteWriteConflictResolver(graph: S2Graph,
           if (fetchedSnapshotEdgeOpt.isDefined && fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.isDefined) fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.get +: edges
           else edges
         val (squashedEdge, edgeMutate) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, _edges)
-        val newVersion = fetchedSnapshotEdgeOpt.map(_.version).getOrElse(squashedEdge.ts) + 2
+        val newVersion = fetchedSnapshotEdgeOpt.map(_.getVersion()).getOrElse(squashedEdge.ts) + 2
         val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge match {
           case None => squashedEdge.toSnapshotEdge.copy(statusCode = 0, pendingEdgeOpt = None, version = newVersion)
           case Some(newSnapshotEdge) => newSnapshotEdge.copy(statusCode = 0, pendingEdgeOpt = None, version = newVersion)
@@ -246,8 +246,8 @@ class WriteWriteConflictResolver(graph: S2Graph,
     * @return
     */
   protected def commitProcess(statusCode: Byte,
-                              squashedEdge: S2Edge,
-                              fetchedSnapshotEdgeOpt: Option[S2Edge],
+                              squashedEdge: S2EdgeLike,
+                              fetchedSnapshotEdgeOpt: Option[S2EdgeLike],
                               lockSnapshotEdge: SnapshotEdge,
                               releaseLockSnapshotEdge: SnapshotEdge,
                               edgeMutate: EdgeMutate)(implicit ec: ExecutionContext): Future[Boolean] = {
@@ -259,7 +259,7 @@ class WriteWriteConflictResolver(graph: S2Graph,
     } yield lockReleased
   }
 
-  case class PartialFailureException(edge: S2Edge, statusCode: Byte, failReason: String) extends NoStackException(failReason)
+  case class PartialFailureException(edge: S2EdgeLike, statusCode: Byte, failReason: String) extends NoStackException(failReason)
 
   protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge) = {
     val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}").mkString("\n")
@@ -282,8 +282,8 @@ class WriteWriteConflictResolver(graph: S2Graph,
     * @return
     */
   protected def acquireLock(statusCode: Byte,
-                            squashedEdge: S2Edge,
-                            fetchedSnapshotEdgeOpt: Option[S2Edge],
+                            squashedEdge: S2EdgeLike,
+                            fetchedSnapshotEdgeOpt: Option[S2EdgeLike],
                             lockEdge: SnapshotEdge)(implicit ec: ExecutionContext): Future[Boolean] = {
     if (statusCode >= 1) {
       logger.debug(s"skip acquireLock: [$statusCode]\n${squashedEdge.toLogString}")
@@ -334,7 +334,7 @@ class WriteWriteConflictResolver(graph: S2Graph,
     */
   protected def releaseLock(predicate: Boolean,
                             statusCode: Byte,
-                            squashedEdge: S2Edge,
+                            squashedEdge: S2EdgeLike,
                             releaseLockEdge: SnapshotEdge)(implicit ec: ExecutionContext): Future[Boolean] = {
     if (!predicate) {
       Future.failed(new PartialFailureException(squashedEdge, 3, "predicate failed."))
@@ -379,7 +379,7 @@ class WriteWriteConflictResolver(graph: S2Graph,
     */
   protected def commitIndexEdgeMutations(predicate: Boolean,
                                          statusCode: Byte,
-                                         squashedEdge: S2Edge,
+                                         squashedEdge: S2EdgeLike,
                                          edgeMutate: EdgeMutate)(implicit ec: ExecutionContext): Future[Boolean] = {
     if (!predicate) Future.failed(new PartialFailureException(squashedEdge, 1, "predicate failed."))
     else {
@@ -413,7 +413,7 @@ class WriteWriteConflictResolver(graph: S2Graph,
     */
   protected def commitIndexEdgeDegreeMutations(predicate: Boolean,
                                                statusCode: Byte,
-                                               squashedEdge: S2Edge,
+                                               squashedEdge: S2EdgeLike,
                                                edgeMutate: EdgeMutate)(implicit ec: ExecutionContext): Future[Boolean] = {
 
     def _write(kvs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] = {
@@ -445,8 +445,8 @@ class WriteWriteConflictResolver(graph: S2Graph,
 
   /** end of methods for consistency */
 
-  def mutateLog(snapshotEdgeOpt: Option[S2Edge], edges: Seq[S2Edge],
-                newEdge: S2Edge, edgeMutate: EdgeMutate) =
+  def mutateLog(snapshotEdgeOpt: Option[S2EdgeLike], edges: Seq[S2EdgeLike],
+                newEdge: S2EdgeLike, edgeMutate: EdgeMutate) =
     Seq("----------------------------------------------",
       s"SnapshotEdge: ${snapshotEdgeOpt.map(_.toLogString)}",
       s"requestEdges: ${edges.map(_.toLogString).mkString("\n")}",

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
index 92130f5..6be5f60 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
@@ -67,7 +67,7 @@ class AsynchbaseStorageReadable(val graph: S2Graph,
     * @param queryRequest
     * @return
     */
-  private def buildRequest(queryRequest: QueryRequest, edge: S2Edge) = {
+  private def buildRequest(queryRequest: QueryRequest, edge: S2EdgeLike) = {
     import Serializable._
     val queryParam = queryRequest.queryParam
     val label = queryParam.label
@@ -178,7 +178,7 @@ class AsynchbaseStorageReadable(val graph: S2Graph,
     Left(get)
   }
 
-  override def fetchKeyValues(queryRequest: QueryRequest, edge: S2Edge)(implicit ec: ExecutionContext) = {
+  override def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext) = {
     val rpc = buildRequest(queryRequest, edge)
     fetchKeyValues(rpc)
   }
@@ -221,7 +221,7 @@ class AsynchbaseStorageReadable(val graph: S2Graph,
     }
   }
 
-  override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2Edge]] = {
+  override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = {
     val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case (hTableName, labels) =>
       val distinctLabels = labels.toSet
       val scan = AsynchbasePatcher.newScanner(client, hTableName)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
index 2501ed9..01f268b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -22,10 +22,9 @@ package org.apache.s2graph.core.storage.serde.indexedge.tall
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn}
-import org.apache.s2graph.core.storage.serde._
-import org.apache.s2graph.core.storage.serde.StorageDeserializable._
-import org.apache.s2graph.core.storage.serde.Deserializable
 import org.apache.s2graph.core.storage.CanSKeyValue
+import org.apache.s2graph.core.storage.serde.Deserializable
+import org.apache.s2graph.core.storage.serde.StorageDeserializable._
 import org.apache.s2graph.core.types._
 
 object IndexEdgeDeserializable{
@@ -33,13 +32,13 @@ object IndexEdgeDeserializable{
 }
 class IndexEdgeDeserializable(graph: S2Graph,
                               bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong,
-                              tallSchemaVersions: Set[String] = Set(HBaseType.VERSION4)) extends Deserializable[S2Edge] {
+                              tallSchemaVersions: Set[String] = Set(HBaseType.VERSION4)) extends Deserializable[S2EdgeLike] {
 
    type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int)
    type ValueRaw = (Array[(LabelMeta, InnerValLike)], Int)
 
    override def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
-                                               cacheElementOpt: Option[S2Edge]): Option[S2Edge] = {
+                                               cacheElementOpt: Option[S2EdgeLike]): Option[S2EdgeLike] = {
 
      try {
        assert(_kvs.size == 1)
@@ -79,8 +78,8 @@ class IndexEdgeDeserializable(graph: S2Graph,
            edge.propertyInner(LabelMeta.timestamp.name, version, version)
            edge.propertyInner(LabelMeta.degree.name, degreeVal, version)
            edge.tgtVertex = graph.newVertex(tgtVertexId, version)
-           edge.op = GraphUtil.defaultOpByte
-           edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
+           edge.setOp(GraphUtil.defaultOpByte)
+           edge.setTsInnerValOpt(Option(InnerVal.withLong(tsVal, schemaVer)))
          } else {
            // not degree edge
            val (idxPropsRaw, endAt) =
@@ -150,8 +149,8 @@ class IndexEdgeDeserializable(graph: S2Graph,
 
            edge.propertyInner(LabelMeta.timestamp.name, tsVal, version)
            edge.tgtVertex = graph.newVertex(tgtVertexId, version)
-           edge.op = op
-           edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
+           edge.setOp(op)
+           edge.setTsInnerValOpt(Option(InnerVal.withLong(tsVal, schemaVer)))
          }
          Option(edge)
        }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
index 68732ce..a7fe8a1 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -28,13 +28,13 @@ import org.apache.s2graph.core.storage.serde.Deserializable
 import org.apache.s2graph.core.types._
 
 class IndexEdgeDeserializable(graph: S2Graph,
-                              bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[S2Edge] {
+                              bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[S2EdgeLike] {
 
    type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int)
    type ValueRaw = (Array[(LabelMeta, InnerValLike)], Int)
 
    override def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
-                                               cacheElementOpt: Option[S2Edge]): Option[S2Edge] = {
+                                               cacheElementOpt: Option[S2EdgeLike]): Option[S2EdgeLike] = {
      try {
        assert(_kvs.size == 1)
 
@@ -68,8 +68,8 @@ class IndexEdgeDeserializable(graph: S2Graph,
            edge.propertyInner(LabelMeta.timestamp.name, version, version)
            edge.propertyInner(LabelMeta.degree.name, degreeVal, version)
            edge.tgtVertex = graph.newVertex(tgtVertexId, version)
-           edge.op = GraphUtil.defaultOpByte
-           edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
+           edge.setOp(GraphUtil.defaultOpByte)
+           edge.setTsInnerValOpt(Option(InnerVal.withLong(tsVal, schemaVer)))
          } else {
            pos = 0
            val (idxPropsRaw, endAt) = bytesToProps(kv.qualifier, pos, schemaVer)
@@ -123,8 +123,8 @@ class IndexEdgeDeserializable(graph: S2Graph,
 
            edge.propertyInner(LabelMeta.timestamp.name, tsVal, version)
            edge.tgtVertex = graph.newVertex(tgtVertexId, version)
-           edge.op = op
-           edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
+           edge.setOp(op)
+           edge.setTsInnerValOpt(Option(InnerVal.withLong(tsVal, schemaVer)))
          }
 
          Option(edge)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
index 5f00b48..24775e7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
@@ -52,7 +52,7 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[
     snapshotEdge.pendingEdgeOpt match {
       case None => valueBytes()
       case Some(pendingEdge) =>
-        val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op)
+        val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.getOp())
         val versionBytes = Array.empty[Byte]
         val propsBytes = pendingEdge.serializePropsWithTs()
         val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
index df84e86..44f2596 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
@@ -59,7 +59,7 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[
     snapshotEdge.pendingEdgeOpt match {
       case None => valueBytes()
       case Some(pendingEdge) =>
-        val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op)
+        val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.getOp())
         val versionBytes = Array.empty[Byte]
         val propsBytes = pendingEdge.serializePropsWithTs()
         val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
index ad9299c..55658b9 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
@@ -37,7 +37,7 @@ class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels {
   val ts = System.currentTimeMillis()
   val dummyTs = LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion)
 
-  def validate(label: Label)(edge: S2Edge)(sql: String)(expected: Boolean) = {
+  def validate(label: Label)(edge: S2EdgeLike)(sql: String)(expected: Boolean) = {
     def debug(whereOpt: Try[Where]) = {
       println("==================")
       println(s"$whereOpt")

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
index d8b2cfa..c7d474b 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
@@ -37,6 +37,7 @@ import scala.collection.JavaConverters._
 object S2GraphProvider {
 
   val Implementation: Set[Class[_]] = Set(
+    classOf[S2EdgeLike],
     classOf[S2Edge],
     classOf[S2Vertex],
     classOf[S2VertexLike],

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
index 101b331..2b31fcd 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
@@ -54,7 +54,7 @@ object EdgeController extends Controller {
     graphElem match {
       case v: S2VertexLike =>
         enqueue(kafkaTopic, graphElem, tsv)
-      case e: S2Edge =>
+      case e: S2EdgeLike =>
         e.innerLabel.extraOptions.get("walLog") match {
           case None =>
             enqueue(kafkaTopic, e, tsv)
@@ -93,8 +93,8 @@ object EdgeController extends Controller {
     val result = s2.mutateElements(elements.map(_._1), true)
     result onComplete { results =>
       results.get.zip(elements).map {
-        case (r: MutateResponse, (e: S2Edge, tsv: String)) if !r.isSuccess =>
-          val kafkaMessages = if(e.op == GraphUtil.operations("deleteAll")){
+        case (r: MutateResponse, (e: S2EdgeLike, tsv: String)) if !r.isSuccess =>
+          val kafkaMessages = if(e.getOp() == GraphUtil.operations("deleteAll")){
             toDeleteAllFailMessages(Seq(e.srcVertex), Seq(e.innerLabel), e.labelWithDir.dir, e.ts)
           } else{
             Seq(ExceptionHandler.toKafkaMessage(Config.KAFKA_MUTATE_FAIL_TOPIC, e, Some(tsv)))