You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2017/11/19 02:28:54 UTC

[11/23] incubator-s2graph git commit: add S2EdgeBuilder.

add S2EdgeBuilder.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/2b5df1dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/2b5df1dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/2b5df1dd

Branch: refs/heads/master
Commit: 2b5df1dd0ddc913dca92353b191e8982368f08c0
Parents: 7d08225
Author: DO YUNG YOON <st...@apache.org>
Authored: Mon Nov 6 23:08:25 2017 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Tue Nov 7 00:50:54 2017 +0900

----------------------------------------------------------------------
 .../s2graph/core/GraphElementBuilder.scala      |   4 +-
 .../org/apache/s2graph/core/QueryParam.scala    |  21 +-
 .../scala/org/apache/s2graph/core/S2Edge.scala  |  63 ++++-
 .../org/apache/s2graph/core/S2EdgeBuilder.scala | 103 ++++++++
 .../org/apache/s2graph/core/S2EdgeLike.scala    | 258 ++++---------------
 .../apache/s2graph/core/storage/StorageIO.scala |   4 +-
 .../core/storage/serde/MutationHelper.scala     |   6 +-
 7 files changed, 234 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b5df1dd/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
index 21179aa..c9133b1 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
@@ -279,10 +279,10 @@ class GraphElementBuilder(graph: S2Graph) {
         val edge = edgeWithScore.edge
         val copiedEdge = label.consistencyLevel match {
           case "strong" =>
-            edge.copyEdge(op = GraphUtil.operations("delete"),
+            edge.builder.copyEdge(op = GraphUtil.operations("delete"),
               version = requestTs, propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs)
           case _ =>
-            edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs)
+            edge.builder.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs)
         }
 
         val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b5df1dd/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
index e98ef37..3d0d076 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -185,7 +185,7 @@ case class EdgeTransformer(jsValue: JsValue) {
             replace(queryParam, fmt, fieldNames.flatMap(fieldName => toInnerValOpt(queryParam, edge, fieldName)), nextStepOpt)
           }
         }
-      } yield edge.updateTgtVertex(innerVal).copy(originalEdgeOpt = Option(edge))
+      } yield edge.builder.updateTgtVertex(innerVal).copyOriginalEdgeOpt(Option(edge))
 
 
       edges
@@ -251,7 +251,26 @@ case class RankParam(keySeqAndWeights: Seq[(LabelMeta, Double)] = Seq((LabelMeta
     }
     bytes
   }
+
+  def score(edge: S2EdgeLike): Double = {
+    if (keySeqAndWeights.size <= 0) 1.0f
+    else {
+      var sum: Double = 0
+
+      for ((labelMeta, w) <- keySeqAndWeights) {
+        if (edge.getPropsWithTs().containsKey(labelMeta.name)) {
+          val innerValWithTs = edge.getPropsWithTs().get(labelMeta.name)
+          val cost = try innerValWithTs.innerVal.toString.toDouble catch {
+            case e: Exception => 1.0
+          }
+          sum += w * cost
+        }
+      }
+      sum
+    }
+  }
 }
+
 object QueryParam {
   lazy val Empty = QueryParam(labelName = "")
   lazy val DefaultThreshold = Double.MinValue

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b5df1dd/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
index 3529991..03678c8 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
@@ -40,6 +40,17 @@ import scala.concurrent.Await
 import scala.util.hashing.MurmurHash3
 
 object SnapshotEdge {
+  def apply(e: S2EdgeLike): SnapshotEdge = {
+    val (smaller, larger) = (e.srcForVertex, e.tgtForVertex)
+
+    val snapshotEdge = SnapshotEdge(e.innerGraph, smaller, larger, e.innerLabel,
+      GraphUtil.directions("out"), e.getOp(), e.getVersion(), e.getPropsWithTs(),
+      pendingEdgeOpt = e.getPendingEdgeOpt(), statusCode = e.getStatusCode(), lockTs = e.getLockTs(), tsInnerValOpt = e.getTsInnerValOpt())
+
+    snapshotEdge.property(LabelMeta.timestamp.name, e.ts, e.ts)
+
+    snapshotEdge
+  }
 
   def copyFrom(e: SnapshotEdge): SnapshotEdge = {
     val copy =
@@ -418,8 +429,8 @@ object EdgeMutate {
   }
 }
 
-case class EdgeMutate(edgesToDelete: List[IndexEdge] = List.empty[IndexEdge],
-                      edgesToInsert: List[IndexEdge] = List.empty[IndexEdge],
+case class EdgeMutate(edgesToDelete: Seq[IndexEdge] = Nil,
+                      edgesToInsert: Seq[IndexEdge] = Nil,
                       newSnapshotEdge: Option[SnapshotEdge] = None) {
 
   def deepCopy: EdgeMutate = copy(
@@ -799,5 +810,53 @@ object S2Edge {
 
 //  def fromString(s: String): Option[Edge] = Graph.toEdge(s)
 
+  def getServiceColumn(vertex: S2VertexLike, defaultServiceColumn: ServiceColumn) =
+    if (vertex.id.column == ServiceColumn.Default) defaultServiceColumn else vertex.id.column
+
+  def srcForVertex(e: S2EdgeLike): S2VertexLike = {
+    val belongLabelIds = Seq(e.labelWithDir.labelId)
+    if (e.labelWithDir.dir == GraphUtil.directions("in")) {
+      val tgtColumn = getServiceColumn(e.tgtVertex, e.innerLabel.tgtColumn)
+      e.innerGraph.newVertex(VertexId(tgtColumn, e.tgtVertex.innerId), e.tgtVertex.ts, e.tgtVertex.props, belongLabelIds = belongLabelIds)
+    } else {
+      val srcColumn = getServiceColumn(e.srcVertex, e.innerLabel.srcColumn)
+      e.innerGraph.newVertex(VertexId(srcColumn, e.srcVertex.innerId), e.srcVertex.ts, e.srcVertex.props, belongLabelIds = belongLabelIds)
+    }
+  }
 
+  def tgtForVertex(e: S2EdgeLike): S2VertexLike = {
+    val belongLabelIds = Seq(e.labelWithDir.labelId)
+    if (e.labelWithDir.dir == GraphUtil.directions("in")) {
+      val srcColumn = getServiceColumn(e.srcVertex, e.innerLabel.srcColumn)
+      e.innerGraph.newVertex(VertexId(srcColumn, e.srcVertex.innerId), e.srcVertex.ts, e.srcVertex.props, belongLabelIds = belongLabelIds)
+    } else {
+      val tgtColumn = getServiceColumn(e.tgtVertex, e.innerLabel.tgtColumn)
+      e.innerGraph.newVertex(VertexId(tgtColumn, e.tgtVertex.innerId), e.tgtVertex.ts, e.tgtVertex.props, belongLabelIds = belongLabelIds)
+    }
+  }
+
+  def updatePropsWithTs(e: S2EdgeLike, others: Props = S2Edge.EmptyProps): Props = {
+    val emptyProp = S2Edge.EmptyProps
+
+    e.getPropsWithTs().forEach(new BiConsumer[String, S2Property[_]] {
+      override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value)
+    })
+
+    others.forEach(new BiConsumer[String, S2Property[_]] {
+      override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value)
+    })
+
+    emptyProp
+  }
+
+  def propertyValue(e: S2EdgeLike, key: String): Option[InnerValLikeWithTs] = {
+    key match {
+      case "from" | "_from" => Option(InnerValLikeWithTs(e.srcVertex.innerId, e.ts))
+      case "to" | "_to" => Option(InnerValLikeWithTs(e.tgtVertex.innerId, e.ts))
+      case "label" => Option(InnerValLikeWithTs(InnerVal.withStr(e.innerLabel.label, e.innerLabel.schemaVersion), e.ts))
+      case "direction" => Option(InnerValLikeWithTs(InnerVal.withStr(e.direction, e.innerLabel.schemaVersion), e.ts))
+      case _ =>
+        e.innerLabel.metaPropsInvMap.get(key).map(labelMeta => e.propertyValueInner(labelMeta))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b5df1dd/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
new file mode 100644
index 0000000..ea9598e
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
@@ -0,0 +1,103 @@
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.S2Edge.State
+import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
+import org.apache.s2graph.core.types.{InnerValLike, TargetVertexId, VertexId}
+import org.apache.tinkerpop.gremlin.structure.Property
+
+import scala.collection.JavaConverters._
+
+class S2EdgeBuilder(edge: S2EdgeLike) {
+  def srcForVertex = S2Edge.srcForVertex(edge)
+
+  def tgtForVertex = S2Edge.tgtForVertex(edge)
+
+  def duplicateEdge = reverseSrcTgtEdge.reverseDirEdge
+
+  def reverseDirEdge = copyEdge(dir = GraphUtil.toggleDir(edge.getDir))
+
+  def reverseSrcTgtEdge = copyEdge(srcVertex = edge.tgtVertex, tgtVertex = edge.srcVertex)
+
+  def isDegree = edge.getPropsWithTs().containsKey(LabelMeta.degree.name)
+
+  def propsPlusTsValid = edge.getPropsWithTs().asScala.filter(kv => LabelMeta.isValidSeq(kv._2.labelMeta.seq)).asJava
+
+  def labelOrders = LabelIndex.findByLabelIdAll(edge.labelWithDir.labelId)
+
+  def edgesWithIndex = for (labelOrder <- labelOrders) yield {
+    IndexEdge(edge.innerGraph, edge.srcVertex, edge.tgtVertex, edge.innerLabel, edge.getDir(), edge.getOp(),
+      edge.getVersion(), labelOrder.seq, edge.getPropsWithTs(), tsInnerValOpt = edge.getTsInnerValOpt())
+  }
+
+  def edgesWithIndexValid = for (labelOrder <- labelOrders) yield {
+    IndexEdge(edge.innerGraph, edge.srcVertex, edge.tgtVertex, edge.innerLabel, edge.getDir(), edge.getOp(),
+      edge.getVersion(), labelOrder.seq, propsPlusTsValid, tsInnerValOpt = edge.getTsInnerValOpt())
+  }
+
+  def relatedEdges: Seq[S2EdgeLike] = {
+    if (edge.labelWithDir.isDirected) {
+      val skipReverse = edge.innerLabel.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false)
+      if (skipReverse) Seq(edge) else Seq(edge, duplicateEdge)
+    } else {
+      //      val outDir = labelWithDir.copy(dir = GraphUtil.directions("out"))
+      //      val base = copy(labelWithDir = outDir)
+      val base = copyEdge(dir = GraphUtil.directions("out"))
+      Seq(base, base.reverseSrcTgtEdge)
+    }
+  }
+
+  def copyEdge(innerGraph: S2Graph = edge.innerGraph,
+               srcVertex: S2VertexLike = edge.srcVertex,
+               tgtVertex: S2VertexLike = edge.tgtVertex,
+               innerLabel: Label = edge.innerLabel,
+               dir: Int = edge.getDir(),
+               op: Byte = edge.getOp(),
+               version: Long = edge.getVersion(),
+               propsWithTs: State = S2Edge.propsToState(edge.getPropsWithTs()),
+               parentEdges: Seq[EdgeWithScore] = edge.getParentEdges(),
+               originalEdgeOpt: Option[S2EdgeLike] = edge.getOriginalEdgeOpt(),
+               pendingEdgeOpt: Option[S2EdgeLike] = edge.getPendingEdgeOpt(),
+               statusCode: Byte = edge.getStatusCode(),
+               lockTs: Option[Long] = edge.getLockTs(),
+               tsInnerValOpt: Option[InnerValLike] = edge.getTsInnerValOpt(),
+               ts: Long = edge.getTs()): S2EdgeLike = {
+    val edge = new S2Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, S2Edge.EmptyProps,
+      parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
+    S2Edge.fillPropsWithTs(edge, propsWithTs)
+    edge.propertyInner(LabelMeta.timestamp.name, ts, ts)
+    edge
+  }
+
+  def copyEdgeWithState(state: State): S2EdgeLike = {
+    val newEdge = new S2Edge(edge.innerGraph, edge.srcVertex, edge.tgtVertex, edge.innerLabel,
+      edge.getDir(), edge.getOp(), edge.getVersion(), S2Edge.EmptyProps,
+      edge.getParentEdges(), edge.getOriginalEdgeOpt(), edge.getPendingEdgeOpt(),
+      edge.getStatusCode(), edge.getLockTs(), edge.getTsInnerValOpt())
+
+    S2Edge.fillPropsWithTs(newEdge, state)
+    newEdge
+  }
+
+  def updateTgtVertex(id: InnerValLike): S2EdgeLike = {
+    val newId = TargetVertexId(edge.tgtVertex.id.column, id)
+    val newTgtVertex = edge.innerGraph.newVertex(newId, edge.tgtVertex.ts, edge.tgtVertex.props)
+    copyEdge(tgtVertex = newTgtVertex)
+  }
+
+  def edgeId: EdgeId = {
+    val timestamp = if (edge.innerLabel.consistencyLevel == "strong") 0l else edge.ts
+    //    EdgeId(srcVertex.innerId, tgtVertex.innerId, label(), "out", timestamp)
+    val (srcColumn, tgtColumn) = edge.innerLabel.srcTgtColumn(edge.getDir())
+    if (edge.getDir() == GraphUtil.directions("out"))
+      EdgeId(VertexId(srcColumn, edge.srcVertex.id.innerId), VertexId(tgtColumn, edge.tgtVertex.id.innerId), edge.label(), "out", timestamp)
+    else
+      EdgeId(VertexId(tgtColumn, edge.tgtVertex.id.innerId), VertexId(srcColumn, edge.srcVertex.id.innerId), edge.label(), "out", timestamp)
+  }
+
+  def propertyInner[V](key: String, value: V, ts: Long): Property[V] = {
+    val labelMeta = edge.innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge."))
+    val newProp = new S2Property[V](edge, labelMeta, key, value, ts)
+    edge.getPropsWithTs().put(key, newProp)
+    newProp
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b5df1dd/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
index f823d60..9963be7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
@@ -17,6 +17,8 @@ import scala.collection.JavaConverters._
 trait S2EdgeLike extends Edge with GraphElement {
   this: S2Edge =>
 
+  val builder: S2EdgeBuilder = new S2EdgeBuilder(this)
+
   val innerGraph: S2Graph
   val srcVertex: S2VertexLike
   var tgtVertex: S2VertexLike
@@ -48,6 +50,14 @@ trait S2EdgeLike extends Edge with GraphElement {
   lazy val labelName = innerLabel.label
   lazy val direction = GraphUtil.fromDirection(dir)
 
+  def getTs(): Long = ts
+  def getOriginalEdgeOpt(): Option[S2EdgeLike] = originalEdgeOpt
+  def getParentEdges(): Seq[EdgeWithScore] = parentEdges
+  def getPendingEdgeOpt(): Option[S2EdgeLike] = pendingEdgeOpt
+  def getPropsWithTs(): Props = propsWithTs
+  def getLockTs(): Option[Long] = lockTs
+  def getStatusCode(): Byte = statusCode
+  def getDir(): Int = dir
   def setTgtVertex(v: S2VertexLike): Unit = tgtVertex = v
   def getOp(): Byte = op
   def setOp(newOp: Byte): Unit = op = newOp
@@ -60,30 +70,10 @@ trait S2EdgeLike extends Edge with GraphElement {
 
   def serializePropsWithTs(): Array[Byte] = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq)
 
-  def updatePropsWithTs(others: Props = S2Edge.EmptyProps): Props = {
-    val emptyProp = S2Edge.EmptyProps
-
-    propsWithTs.forEach(new BiConsumer[String, S2Property[_]] {
-      override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value)
-    })
+  def updatePropsWithTs(others: Props = S2Edge.EmptyProps): Props =
+    S2Edge.updatePropsWithTs(this, others)
 
-    others.forEach(new BiConsumer[String, S2Property[_]] {
-      override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value)
-    })
-
-    emptyProp
-  }
-
-  def propertyValue(key: String): Option[InnerValLikeWithTs] = {
-    key match {
-      case "from" | "_from" => Option(InnerValLikeWithTs(srcVertex.innerId, ts))
-      case "to" | "_to" => Option(InnerValLikeWithTs(tgtVertex.innerId, ts))
-      case "label" => Option(InnerValLikeWithTs(InnerVal.withStr(innerLabel.label, schemaVer), ts))
-      case "direction" => Option(InnerValLikeWithTs(InnerVal.withStr(direction, schemaVer), ts))
-      case _ =>
-        innerLabel.metaPropsInvMap.get(key).map(labelMeta => propertyValueInner(labelMeta))
-    }
-  }
+  def propertyValue(key: String): Option[InnerValLikeWithTs] = S2Edge.propertyValue(this, key)
 
   def propertyValueInner(labelMeta: LabelMeta): InnerValLikeWithTs = {
     //    propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse()
@@ -116,172 +106,55 @@ trait S2EdgeLike extends Edge with GraphElement {
     }
   }
 
-  lazy val properties = toProps()
-
-  def props = propsWithTs.asScala.mapValues(_.innerVal)
-
-  def relatedEdges = {
-    if (labelWithDir.isDirected) {
-      val skipReverse = innerLabel.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false)
-      if (skipReverse) List(this) else List(this, duplicateEdge)
-    } else {
-      //      val outDir = labelWithDir.copy(dir = GraphUtil.directions("out"))
-      //      val base = copy(labelWithDir = outDir)
-      val base = copy(dir = GraphUtil.directions("out"))
-      List(base, base.reverseSrcTgtEdge)
-    }
-  }
+  def relatedEdges = builder.relatedEdges
 
-  def srcForVertex = {
-    val belongLabelIds = Seq(labelWithDir.labelId)
-    if (labelWithDir.dir == GraphUtil.directions("in")) {
-      val tgtColumn = getServiceColumn(tgtVertex, innerLabel.tgtColumn)
-      innerGraph.newVertex(VertexId(tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
-    } else {
-      val srcColumn = getServiceColumn(srcVertex, innerLabel.srcColumn)
-      innerGraph.newVertex(VertexId(srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
-    }
-  }
+  def srcForVertex = builder.srcForVertex
 
-  def tgtForVertex = {
-    val belongLabelIds = Seq(labelWithDir.labelId)
-    if (labelWithDir.dir == GraphUtil.directions("in")) {
-      val srcColumn = getServiceColumn(srcVertex, innerLabel.srcColumn)
-      innerGraph.newVertex(VertexId(srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
-    } else {
-      val tgtColumn = getServiceColumn(tgtVertex, innerLabel.tgtColumn)
-      innerGraph.newVertex(VertexId(tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
-    }
-  }
+  def tgtForVertex = builder.tgtForVertex
 
-  def duplicateEdge = reverseSrcTgtEdge.reverseDirEdge
+  def duplicateEdge = builder.duplicateEdge
 
   //  def reverseDirEdge = copy(labelWithDir = labelWithDir.dirToggled)
-  def reverseDirEdge = copy(dir = GraphUtil.toggleDir(dir))
+  def reverseDirEdge = builder.reverseDirEdge
 
-  def reverseSrcTgtEdge = copy(srcVertex = tgtVertex, tgtVertex = srcVertex)
+  def reverseSrcTgtEdge = builder.reverseSrcTgtEdge
 
-  def labelOrders = LabelIndex.findByLabelIdAll(labelWithDir.labelId)
+  def isDegree = builder.isDegree
 
-  def isDegree = propsWithTs.containsKey(LabelMeta.degree.name)
+  def edgesWithIndex = builder.edgesWithIndex
 
-  def propsPlusTsValid = propsWithTs.asScala.filter(kv => LabelMeta.isValidSeq(kv._2.labelMeta.seq)).asJava
-
-  def edgesWithIndex = for (labelOrder <- labelOrders) yield {
-    IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsWithTs, tsInnerValOpt = tsInnerValOpt)
-  }
-
-  def edgesWithIndexValid = for (labelOrder <- labelOrders) yield {
-    IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsPlusTsValid, tsInnerValOpt = tsInnerValOpt)
-  }
+  def edgesWithIndexValid = builder.edgesWithIndexValid
 
   /** force direction as out on invertedEdge */
-  def toSnapshotEdge: SnapshotEdge = {
-    val (smaller, larger) = (srcForVertex, tgtForVertex)
-
-    //    val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, GraphUtil.directions("out"))
-
-    propertyInner(LabelMeta.timestamp.name, ts, ts)
-    val ret = SnapshotEdge(innerGraph, smaller, larger, innerLabel,
-      GraphUtil.directions("out"), op, version, propsWithTs,
-      pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt)
-    ret
-  }
-
-  def defaultPropsWithName = Json.obj("from" -> srcVertex.innerId.toString(), "to" -> tgtVertex.innerId.toString(),
-    "label" -> innerLabel.label, "service" -> innerLabel.serviceName)
-
-  def propsWithName =
-    for {
-      (_, v) <- propsWithTs.asScala
-      meta = v.labelMeta
-      jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
-    } yield meta.name -> jsValue
-
-  def updateTgtVertex(id: InnerValLike) = {
-    val newId = TargetVertexId(tgtVertex.id.column, id)
-    val newTgtVertex = innerGraph.newVertex(newId, tgtVertex.ts, tgtVertex.props)
-    S2Edge(innerGraph, srcVertex, newTgtVertex, innerLabel, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
-  }
-
-  def updateOriginalEdgeOpt(newOriginalEdgeOpt: S2EdgeLike): S2EdgeLike = {
-    S2Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, propsWithTs, parentEdges,
-      Option(newOriginalEdgeOpt), pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
-  }
-
-  def rank(r: RankParam): Double =
-    if (r.keySeqAndWeights.size <= 0) 1.0f
-    else {
-      var sum: Double = 0
-
-      for ((labelMeta, w) <- r.keySeqAndWeights) {
-        if (propsWithTs.containsKey(labelMeta.name)) {
-          val innerValWithTs = propsWithTs.get(labelMeta.name)
-          val cost = try innerValWithTs.innerVal.toString.toDouble catch {
-            case e: Exception =>
-              logger.error("toInnerval failed in rank", e)
-              1.0
-          }
-          sum += w * cost
-        }
-      }
-      sum
-    }
+  def toSnapshotEdge: SnapshotEdge = SnapshotEdge.apply(this)
 
   def checkProperty(key: String): Boolean = propsWithTs.containsKey(key)
 
-  def copyEdge(srcVertex: S2VertexLike = srcVertex,
-               tgtVertex: S2VertexLike = tgtVertex,
-               innerLabel: Label = innerLabel,
-               dir: Int = dir,
-               op: Byte = op,
-               version: Long = version,
-               propsWithTs: State = S2Edge.propsToState(this.propsWithTs),
-               parentEdges: Seq[EdgeWithScore] = parentEdges,
-               originalEdgeOpt: Option[S2EdgeLike] = originalEdgeOpt,
-               pendingEdgeOpt: Option[S2EdgeLike] = pendingEdgeOpt,
-               statusCode: Byte = statusCode,
-               lockTs: Option[Long] = lockTs,
-               tsInnerValOpt: Option[InnerValLike] = tsInnerValOpt,
-               ts: Long = ts): S2EdgeLike = {
-    val edge = new S2Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, S2Edge.EmptyProps,
-      parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
-    S2Edge.fillPropsWithTs(edge, propsWithTs)
-    edge.propertyInner(LabelMeta.timestamp.name, ts, ts)
-    edge
-  }
-
-  def copyEdgeWithState(state: State, ts: Long): S2EdgeLike = {
-    val newEdge = copy(propsWithTs = S2Edge.EmptyProps)
-    S2Edge.fillPropsWithTs(newEdge, state)
-    newEdge.propertyInner(LabelMeta.timestamp.name, ts, ts)
-    newEdge
-  }
-
   def copyEdgeWithState(state: State): S2EdgeLike = {
-    val newEdge = copy(propsWithTs = S2Edge.EmptyProps)
-    S2Edge.fillPropsWithTs(newEdge, state)
-    newEdge
+    builder.copyEdgeWithState(state)
   }
 
   def copyOp(newOp: Byte): S2EdgeLike = {
-    copy(op = newOp)
+    builder.copyEdge(op = newOp)
   }
 
   def copyVersion(newVersion: Long): S2EdgeLike = {
-    copy(version = newVersion)
+    builder.copyEdge(version = newVersion)
   }
 
   def copyParentEdges(parents: Seq[EdgeWithScore]): S2EdgeLike = {
-    copy(parentEdges = parents)
+    builder.copyEdge(parentEdges = parents)
   }
 
+  def copyOriginalEdgeOpt(newOriginalEdgeOpt: Option[S2EdgeLike]): S2EdgeLike =
+    builder.copyEdge(originalEdgeOpt = newOriginalEdgeOpt)
+
   def copyStatusCode(newStatusCode: Byte): S2EdgeLike = {
-    copy(statusCode = newStatusCode)
+    builder.copyEdge(statusCode = newStatusCode)
   }
 
   def copyLockTs(newLockTs: Option[Long]): S2EdgeLike = {
-    copy(lockTs = newLockTs)
+    builder.copyEdge(lockTs = newLockTs)
   }
 
   def vertices(direction: Direction): util.Iterator[structure.Vertex] = {
@@ -289,19 +162,9 @@ trait S2EdgeLike extends Edge with GraphElement {
 
     direction match {
       case Direction.OUT =>
-        //        val newVertexId = this.direction match {
-        //          case "out" => VertexId(innerLabel.srcColumn, srcVertex.innerId)
-        //          case "in" => VertexId(innerLabel.tgtColumn, tgtVertex.innerId)
-        //          case _ => throw new IllegalArgumentException("direction can only be out/in.")
-        //        }
         val newVertexId = edgeId.srcVertexId
         innerGraph.getVertex(newVertexId).foreach(arr.add)
       case Direction.IN =>
-        //        val newVertexId = this.direction match {
-        //          case "in" => VertexId(innerLabel.srcColumn, srcVertex.innerId)
-        //          case "out" => VertexId(innerLabel.tgtColumn, tgtVertex.innerId)
-        //          case _ => throw new IllegalArgumentException("direction can only be out/in.")
-        //        }
         val newVertexId = edgeId.tgtVertexId
         innerGraph.getVertex(newVertexId).foreach(arr.add)
       case _ =>
@@ -331,39 +194,33 @@ trait S2EdgeLike extends Edge with GraphElement {
   }
 
   def property[V](key: String): Property[V] = {
-    val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new java.lang.IllegalStateException(s"$key is not configured on Edge."))
     if (propsWithTs.containsKey(key)) propsWithTs.get(key).asInstanceOf[Property[V]]
-    else {
-      Property.empty()
-      //      val default = innerLabel.metaPropsDefaultMapInner(labelMeta)
-      //      propertyInner(key, default.innerVal.value, default.ts).asInstanceOf[Property[V]]
-    }
+    else Property.empty()
   }
 
-  // just for tinkerpop: save to storage, do not use for internal
   def property[V](key: String, value: V): Property[V] = {
     S2Property.assertValidProp(key, value)
 
     val v = propertyInner(key, value, System.currentTimeMillis())
-    val newTs = props.get(LabelMeta.timestamp.name).map(_.toString.toLong + 1).getOrElse(System.currentTimeMillis())
-    val newEdge = this.copyEdge(ts = newTs)
+    val newTs =
+      if (propsWithTs.containsKey(LabelMeta.timestamp.name))
+        propsWithTs.get(LabelMeta.timestamp.name).innerVal.toString().toLong + 1
+      else
+        System.currentTimeMillis()
+
+    val newEdge = builder.copyEdge(ts = newTs)
 
     Await.result(innerGraph.mutateEdges(Seq(newEdge), withWait = true), innerGraph.WaitTimeout)
 
     v
   }
 
-  def propertyInner[V](key: String, value: V, ts: Long): Property[V] = {
-    val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge."))
-    val newProp = new S2Property[V](this, labelMeta, key, value, ts)
-    propsWithTs.put(key, newProp)
-    newProp
-  }
+  def propertyInner[V](key: String, value: V, ts: Long): Property[V] = builder.propertyInner(key, value, ts)
 
   def remove(): Unit = {
     if (graph.features().edge().supportsRemoveEdges()) {
       val requestTs = System.currentTimeMillis()
-      val edgeToDelete = this.copyEdge(op = GraphUtil.operations("delete"),
+      val edgeToDelete = builder.copyEdge(op = GraphUtil.operations("delete"),
         version = version + S2Edge.incrementVersion, propsWithTs = S2Edge.propsToState(updatePropsWithTs()), ts = requestTs)
       // should we delete related edges also?
       val future = innerGraph.mutateEdges(Seq(edgeToDelete), withWait = true)
@@ -376,43 +233,14 @@ trait S2EdgeLike extends Edge with GraphElement {
 
   def graph(): Graph = innerGraph
 
-  lazy val edgeId: EdgeId = {
-    // NOTE: xxxForVertex makes direction to be "out"
-    val timestamp = if (this.innerLabel.consistencyLevel == "strong") 0l else ts
-    //    EdgeId(srcVertex.innerId, tgtVertex.innerId, label(), "out", timestamp)
-    val (srcColumn, tgtColumn) = innerLabel.srcTgtColumn(dir)
-    if (direction == "out")
-      EdgeId(VertexId(srcColumn, srcVertex.id.innerId), VertexId(tgtColumn, tgtVertex.id.innerId), label(), "out", timestamp)
-    else
-      EdgeId(VertexId(tgtColumn, tgtVertex.id.innerId), VertexId(srcColumn, srcVertex.id.innerId), label(), "out", timestamp)
-  }
+  lazy val edgeId: EdgeId = builder.edgeId
 
   def id(): AnyRef = edgeId
 
   def label(): String = innerLabel.label
 
-  private def toProps(): Map[String, Any] = {
-    for {
-      (labelMeta, defaultVal) <- innerLabel.metaPropsDefaultMapInner
-    } yield {
-      //      labelMeta.name -> propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse(defaultVal).innerVal.value
-      val value =
-        if (propsWithTs.containsKey(labelMeta.name)) {
-          propsWithTs.get(labelMeta.name).value
-        } else {
-          defaultVal.innerVal.value
-        }
-      labelMeta.name -> value
-    }
-  }
-
   def toLogString: String = {
     //    val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj())
     List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, propsWithTs).mkString("\t")
   }
-
-  private def getServiceColumn(vertex: S2VertexLike, defaultServiceColumn: ServiceColumn) =
-    if (vertex.id.column == ServiceColumn.Default) defaultServiceColumn else vertex.id.column
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b5df1dd/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
index 28b15d0..1b9c94b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
@@ -121,7 +121,7 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) {
           if where == WhereParser.success || where.filter(edge)
           convertedEdge <- if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt)
         } yield {
-          val score = edge.rank(queryParam.rank)
+          val score = queryParam.rank.score(edge)
           EdgeWithScore(convertedEdge, score, label)
         }
         StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = degreeEdges, cursors = lastCursor)
@@ -134,7 +134,7 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) {
           if where == WhereParser.success || where.filter(edge)
           convertedEdge <- if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt)
         } yield {
-          val edgeScore = edge.rank(queryParam.rank)
+          val edgeScore = queryParam.rank.score(edge)
           val score = queryParam.scorePropagateOp match {
             case "plus" => edgeScore + prevScore
             case "divide" =>

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b5df1dd/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
index e2621af..79c9dc3 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
@@ -30,17 +30,17 @@ class MutationHelper(storage: Storage) {
         val edge = edgeWithScore.edge
         val score = edgeWithScore.score
 
-        val edgeSnapshot = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
+        val edgeSnapshot = edge.builder.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
         val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
 
-        val edgeForward = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
+        val edgeForward = edge.builder.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
         val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge =>
           serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
             io.buildIncrementsAsync(indexEdge, -1L)
         }
 
         /* reverted direction */
-        val edgeRevert = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
+        val edgeRevert = edge.builder.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
         val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
           serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
             io.buildIncrementsAsync(indexEdge, -1L)