You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2017/11/19 02:28:54 UTC
[11/23] incubator-s2graph git commit: add S2EdgeBuilder.
add S2EdgeBuilder.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/2b5df1dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/2b5df1dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/2b5df1dd
Branch: refs/heads/master
Commit: 2b5df1dd0ddc913dca92353b191e8982368f08c0
Parents: 7d08225
Author: DO YUNG YOON <st...@apache.org>
Authored: Mon Nov 6 23:08:25 2017 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Tue Nov 7 00:50:54 2017 +0900
----------------------------------------------------------------------
.../s2graph/core/GraphElementBuilder.scala | 4 +-
.../org/apache/s2graph/core/QueryParam.scala | 21 +-
.../scala/org/apache/s2graph/core/S2Edge.scala | 63 ++++-
.../org/apache/s2graph/core/S2EdgeBuilder.scala | 103 ++++++++
.../org/apache/s2graph/core/S2EdgeLike.scala | 258 ++++---------------
.../apache/s2graph/core/storage/StorageIO.scala | 4 +-
.../core/storage/serde/MutationHelper.scala | 6 +-
7 files changed, 234 insertions(+), 225 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b5df1dd/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
index 21179aa..c9133b1 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
@@ -279,10 +279,10 @@ class GraphElementBuilder(graph: S2Graph) {
val edge = edgeWithScore.edge
val copiedEdge = label.consistencyLevel match {
case "strong" =>
- edge.copyEdge(op = GraphUtil.operations("delete"),
+ edge.builder.copyEdge(op = GraphUtil.operations("delete"),
version = requestTs, propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs)
case _ =>
- edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs)
+ edge.builder.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs)
}
val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b5df1dd/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
index e98ef37..3d0d076 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -185,7 +185,7 @@ case class EdgeTransformer(jsValue: JsValue) {
replace(queryParam, fmt, fieldNames.flatMap(fieldName => toInnerValOpt(queryParam, edge, fieldName)), nextStepOpt)
}
}
- } yield edge.updateTgtVertex(innerVal).copy(originalEdgeOpt = Option(edge))
+ } yield edge.builder.updateTgtVertex(innerVal).copyOriginalEdgeOpt(Option(edge))
edges
@@ -251,7 +251,26 @@ case class RankParam(keySeqAndWeights: Seq[(LabelMeta, Double)] = Seq((LabelMeta
}
bytes
}
+
+ def score(edge: S2EdgeLike): Double = {
+ if (keySeqAndWeights.size <= 0) 1.0f
+ else {
+ var sum: Double = 0
+
+ for ((labelMeta, w) <- keySeqAndWeights) {
+ if (edge.getPropsWithTs().containsKey(labelMeta.name)) {
+ val innerValWithTs = edge.getPropsWithTs().get(labelMeta.name)
+ val cost = try innerValWithTs.innerVal.toString.toDouble catch {
+ case e: Exception => 1.0
+ }
+ sum += w * cost
+ }
+ }
+ sum
+ }
+ }
}
+
object QueryParam {
lazy val Empty = QueryParam(labelName = "")
lazy val DefaultThreshold = Double.MinValue
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b5df1dd/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
index 3529991..03678c8 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
@@ -40,6 +40,17 @@ import scala.concurrent.Await
import scala.util.hashing.MurmurHash3
object SnapshotEdge {
+ def apply(e: S2EdgeLike): SnapshotEdge = {
+ val (smaller, larger) = (e.srcForVertex, e.tgtForVertex)
+
+ val snapshotEdge = SnapshotEdge(e.innerGraph, smaller, larger, e.innerLabel,
+ GraphUtil.directions("out"), e.getOp(), e.getVersion(), e.getPropsWithTs(),
+ pendingEdgeOpt = e.getPendingEdgeOpt(), statusCode = e.getStatusCode(), lockTs = e.getLockTs(), tsInnerValOpt = e.getTsInnerValOpt())
+
+ snapshotEdge.property(LabelMeta.timestamp.name, e.ts, e.ts)
+
+ snapshotEdge
+ }
def copyFrom(e: SnapshotEdge): SnapshotEdge = {
val copy =
@@ -418,8 +429,8 @@ object EdgeMutate {
}
}
-case class EdgeMutate(edgesToDelete: List[IndexEdge] = List.empty[IndexEdge],
- edgesToInsert: List[IndexEdge] = List.empty[IndexEdge],
+case class EdgeMutate(edgesToDelete: Seq[IndexEdge] = Nil,
+ edgesToInsert: Seq[IndexEdge] = Nil,
newSnapshotEdge: Option[SnapshotEdge] = None) {
def deepCopy: EdgeMutate = copy(
@@ -799,5 +810,53 @@ object S2Edge {
// def fromString(s: String): Option[Edge] = Graph.toEdge(s)
+ def getServiceColumn(vertex: S2VertexLike, defaultServiceColumn: ServiceColumn) =
+ if (vertex.id.column == ServiceColumn.Default) defaultServiceColumn else vertex.id.column
+
+ def srcForVertex(e: S2EdgeLike): S2VertexLike = {
+ val belongLabelIds = Seq(e.labelWithDir.labelId)
+ if (e.labelWithDir.dir == GraphUtil.directions("in")) {
+ val tgtColumn = getServiceColumn(e.tgtVertex, e.innerLabel.tgtColumn)
+ e.innerGraph.newVertex(VertexId(tgtColumn, e.tgtVertex.innerId), e.tgtVertex.ts, e.tgtVertex.props, belongLabelIds = belongLabelIds)
+ } else {
+ val srcColumn = getServiceColumn(e.srcVertex, e.innerLabel.srcColumn)
+ e.innerGraph.newVertex(VertexId(srcColumn, e.srcVertex.innerId), e.srcVertex.ts, e.srcVertex.props, belongLabelIds = belongLabelIds)
+ }
+ }
+ def tgtForVertex(e: S2EdgeLike): S2VertexLike = {
+ val belongLabelIds = Seq(e.labelWithDir.labelId)
+ if (e.labelWithDir.dir == GraphUtil.directions("in")) {
+ val srcColumn = getServiceColumn(e.srcVertex, e.innerLabel.srcColumn)
+ e.innerGraph.newVertex(VertexId(srcColumn, e.srcVertex.innerId), e.srcVertex.ts, e.srcVertex.props, belongLabelIds = belongLabelIds)
+ } else {
+ val tgtColumn = getServiceColumn(e.tgtVertex, e.innerLabel.tgtColumn)
+ e.innerGraph.newVertex(VertexId(tgtColumn, e.tgtVertex.innerId), e.tgtVertex.ts, e.tgtVertex.props, belongLabelIds = belongLabelIds)
+ }
+ }
+
+ def updatePropsWithTs(e: S2EdgeLike, others: Props = S2Edge.EmptyProps): Props = {
+ val emptyProp = S2Edge.EmptyProps
+
+ e.getPropsWithTs().forEach(new BiConsumer[String, S2Property[_]] {
+ override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value)
+ })
+
+ others.forEach(new BiConsumer[String, S2Property[_]] {
+ override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value)
+ })
+
+ emptyProp
+ }
+
+ def propertyValue(e: S2EdgeLike, key: String): Option[InnerValLikeWithTs] = {
+ key match {
+ case "from" | "_from" => Option(InnerValLikeWithTs(e.srcVertex.innerId, e.ts))
+ case "to" | "_to" => Option(InnerValLikeWithTs(e.tgtVertex.innerId, e.ts))
+ case "label" => Option(InnerValLikeWithTs(InnerVal.withStr(e.innerLabel.label, e.innerLabel.schemaVersion), e.ts))
+ case "direction" => Option(InnerValLikeWithTs(InnerVal.withStr(e.direction, e.innerLabel.schemaVersion), e.ts))
+ case _ =>
+ e.innerLabel.metaPropsInvMap.get(key).map(labelMeta => e.propertyValueInner(labelMeta))
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b5df1dd/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
new file mode 100644
index 0000000..ea9598e
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
@@ -0,0 +1,103 @@
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.S2Edge.State
+import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
+import org.apache.s2graph.core.types.{InnerValLike, TargetVertexId, VertexId}
+import org.apache.tinkerpop.gremlin.structure.Property
+
+import scala.collection.JavaConverters._
+
+class S2EdgeBuilder(edge: S2EdgeLike) {
+ def srcForVertex = S2Edge.srcForVertex(edge)
+
+ def tgtForVertex = S2Edge.tgtForVertex(edge)
+
+ def duplicateEdge = reverseSrcTgtEdge.reverseDirEdge
+
+ def reverseDirEdge = copyEdge(dir = GraphUtil.toggleDir(edge.getDir))
+
+ def reverseSrcTgtEdge = copyEdge(srcVertex = edge.tgtVertex, tgtVertex = edge.srcVertex)
+
+ def isDegree = edge.getPropsWithTs().containsKey(LabelMeta.degree.name)
+
+ def propsPlusTsValid = edge.getPropsWithTs().asScala.filter(kv => LabelMeta.isValidSeq(kv._2.labelMeta.seq)).asJava
+
+ def labelOrders = LabelIndex.findByLabelIdAll(edge.labelWithDir.labelId)
+
+ def edgesWithIndex = for (labelOrder <- labelOrders) yield {
+ IndexEdge(edge.innerGraph, edge.srcVertex, edge.tgtVertex, edge.innerLabel, edge.getDir(), edge.getOp(),
+ edge.getVersion(), labelOrder.seq, edge.getPropsWithTs(), tsInnerValOpt = edge.getTsInnerValOpt())
+ }
+
+ def edgesWithIndexValid = for (labelOrder <- labelOrders) yield {
+ IndexEdge(edge.innerGraph, edge.srcVertex, edge.tgtVertex, edge.innerLabel, edge.getDir(), edge.getOp(),
+ edge.getVersion(), labelOrder.seq, propsPlusTsValid, tsInnerValOpt = edge.getTsInnerValOpt())
+ }
+
+ def relatedEdges: Seq[S2EdgeLike] = {
+ if (edge.labelWithDir.isDirected) {
+ val skipReverse = edge.innerLabel.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false)
+ if (skipReverse) Seq(edge) else Seq(edge, duplicateEdge)
+ } else {
+ // val outDir = labelWithDir.copy(dir = GraphUtil.directions("out"))
+ // val base = copy(labelWithDir = outDir)
+ val base = copyEdge(dir = GraphUtil.directions("out"))
+ Seq(base, base.reverseSrcTgtEdge)
+ }
+ }
+
+ def copyEdge(innerGraph: S2Graph = edge.innerGraph,
+ srcVertex: S2VertexLike = edge.srcVertex,
+ tgtVertex: S2VertexLike = edge.tgtVertex,
+ innerLabel: Label = edge.innerLabel,
+ dir: Int = edge.getDir(),
+ op: Byte = edge.getOp(),
+ version: Long = edge.getVersion(),
+ propsWithTs: State = S2Edge.propsToState(edge.getPropsWithTs()),
+ parentEdges: Seq[EdgeWithScore] = edge.getParentEdges(),
+ originalEdgeOpt: Option[S2EdgeLike] = edge.getOriginalEdgeOpt(),
+ pendingEdgeOpt: Option[S2EdgeLike] = edge.getPendingEdgeOpt(),
+ statusCode: Byte = edge.getStatusCode(),
+ lockTs: Option[Long] = edge.getLockTs(),
+ tsInnerValOpt: Option[InnerValLike] = edge.getTsInnerValOpt(),
+ ts: Long = edge.getTs()): S2EdgeLike = {
+ val edge = new S2Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, S2Edge.EmptyProps,
+ parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
+ S2Edge.fillPropsWithTs(edge, propsWithTs)
+ edge.propertyInner(LabelMeta.timestamp.name, ts, ts)
+ edge
+ }
+
+ def copyEdgeWithState(state: State): S2EdgeLike = {
+ val newEdge = new S2Edge(edge.innerGraph, edge.srcVertex, edge.tgtVertex, edge.innerLabel,
+ edge.getDir(), edge.getOp(), edge.getVersion(), S2Edge.EmptyProps,
+ edge.getParentEdges(), edge.getOriginalEdgeOpt(), edge.getPendingEdgeOpt(),
+ edge.getStatusCode(), edge.getLockTs(), edge.getTsInnerValOpt())
+
+ S2Edge.fillPropsWithTs(newEdge, state)
+ newEdge
+ }
+
+ def updateTgtVertex(id: InnerValLike): S2EdgeLike = {
+ val newId = TargetVertexId(edge.tgtVertex.id.column, id)
+ val newTgtVertex = edge.innerGraph.newVertex(newId, edge.tgtVertex.ts, edge.tgtVertex.props)
+ copyEdge(tgtVertex = newTgtVertex)
+ }
+
+ def edgeId: EdgeId = {
+ val timestamp = if (edge.innerLabel.consistencyLevel == "strong") 0l else edge.ts
+ // EdgeId(srcVertex.innerId, tgtVertex.innerId, label(), "out", timestamp)
+ val (srcColumn, tgtColumn) = edge.innerLabel.srcTgtColumn(edge.getDir())
+ if (edge.getDir() == GraphUtil.directions("out"))
+ EdgeId(VertexId(srcColumn, edge.srcVertex.id.innerId), VertexId(tgtColumn, edge.tgtVertex.id.innerId), edge.label(), "out", timestamp)
+ else
+ EdgeId(VertexId(tgtColumn, edge.tgtVertex.id.innerId), VertexId(srcColumn, edge.srcVertex.id.innerId), edge.label(), "out", timestamp)
+ }
+
+ def propertyInner[V](key: String, value: V, ts: Long): Property[V] = {
+ val labelMeta = edge.innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge."))
+ val newProp = new S2Property[V](edge, labelMeta, key, value, ts)
+ edge.getPropsWithTs().put(key, newProp)
+ newProp
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b5df1dd/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
index f823d60..9963be7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
@@ -17,6 +17,8 @@ import scala.collection.JavaConverters._
trait S2EdgeLike extends Edge with GraphElement {
this: S2Edge =>
+ val builder: S2EdgeBuilder = new S2EdgeBuilder(this)
+
val innerGraph: S2Graph
val srcVertex: S2VertexLike
var tgtVertex: S2VertexLike
@@ -48,6 +50,14 @@ trait S2EdgeLike extends Edge with GraphElement {
lazy val labelName = innerLabel.label
lazy val direction = GraphUtil.fromDirection(dir)
+ def getTs(): Long = ts
+ def getOriginalEdgeOpt(): Option[S2EdgeLike] = originalEdgeOpt
+ def getParentEdges(): Seq[EdgeWithScore] = parentEdges
+ def getPendingEdgeOpt(): Option[S2EdgeLike] = pendingEdgeOpt
+ def getPropsWithTs(): Props = propsWithTs
+ def getLockTs(): Option[Long] = lockTs
+ def getStatusCode(): Byte = statusCode
+ def getDir(): Int = dir
def setTgtVertex(v: S2VertexLike): Unit = tgtVertex = v
def getOp(): Byte = op
def setOp(newOp: Byte): Unit = op = newOp
@@ -60,30 +70,10 @@ trait S2EdgeLike extends Edge with GraphElement {
def serializePropsWithTs(): Array[Byte] = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq)
- def updatePropsWithTs(others: Props = S2Edge.EmptyProps): Props = {
- val emptyProp = S2Edge.EmptyProps
-
- propsWithTs.forEach(new BiConsumer[String, S2Property[_]] {
- override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value)
- })
+ def updatePropsWithTs(others: Props = S2Edge.EmptyProps): Props =
+ S2Edge.updatePropsWithTs(this, others)
- others.forEach(new BiConsumer[String, S2Property[_]] {
- override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value)
- })
-
- emptyProp
- }
-
- def propertyValue(key: String): Option[InnerValLikeWithTs] = {
- key match {
- case "from" | "_from" => Option(InnerValLikeWithTs(srcVertex.innerId, ts))
- case "to" | "_to" => Option(InnerValLikeWithTs(tgtVertex.innerId, ts))
- case "label" => Option(InnerValLikeWithTs(InnerVal.withStr(innerLabel.label, schemaVer), ts))
- case "direction" => Option(InnerValLikeWithTs(InnerVal.withStr(direction, schemaVer), ts))
- case _ =>
- innerLabel.metaPropsInvMap.get(key).map(labelMeta => propertyValueInner(labelMeta))
- }
- }
+ def propertyValue(key: String): Option[InnerValLikeWithTs] = S2Edge.propertyValue(this, key)
def propertyValueInner(labelMeta: LabelMeta): InnerValLikeWithTs = {
// propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse()
@@ -116,172 +106,55 @@ trait S2EdgeLike extends Edge with GraphElement {
}
}
- lazy val properties = toProps()
-
- def props = propsWithTs.asScala.mapValues(_.innerVal)
-
- def relatedEdges = {
- if (labelWithDir.isDirected) {
- val skipReverse = innerLabel.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false)
- if (skipReverse) List(this) else List(this, duplicateEdge)
- } else {
- // val outDir = labelWithDir.copy(dir = GraphUtil.directions("out"))
- // val base = copy(labelWithDir = outDir)
- val base = copy(dir = GraphUtil.directions("out"))
- List(base, base.reverseSrcTgtEdge)
- }
- }
+ def relatedEdges = builder.relatedEdges
- def srcForVertex = {
- val belongLabelIds = Seq(labelWithDir.labelId)
- if (labelWithDir.dir == GraphUtil.directions("in")) {
- val tgtColumn = getServiceColumn(tgtVertex, innerLabel.tgtColumn)
- innerGraph.newVertex(VertexId(tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
- } else {
- val srcColumn = getServiceColumn(srcVertex, innerLabel.srcColumn)
- innerGraph.newVertex(VertexId(srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
- }
- }
+ def srcForVertex = builder.srcForVertex
- def tgtForVertex = {
- val belongLabelIds = Seq(labelWithDir.labelId)
- if (labelWithDir.dir == GraphUtil.directions("in")) {
- val srcColumn = getServiceColumn(srcVertex, innerLabel.srcColumn)
- innerGraph.newVertex(VertexId(srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
- } else {
- val tgtColumn = getServiceColumn(tgtVertex, innerLabel.tgtColumn)
- innerGraph.newVertex(VertexId(tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
- }
- }
+ def tgtForVertex = builder.tgtForVertex
- def duplicateEdge = reverseSrcTgtEdge.reverseDirEdge
+ def duplicateEdge = builder.duplicateEdge
// def reverseDirEdge = copy(labelWithDir = labelWithDir.dirToggled)
- def reverseDirEdge = copy(dir = GraphUtil.toggleDir(dir))
+ def reverseDirEdge = builder.reverseDirEdge
- def reverseSrcTgtEdge = copy(srcVertex = tgtVertex, tgtVertex = srcVertex)
+ def reverseSrcTgtEdge = builder.reverseSrcTgtEdge
- def labelOrders = LabelIndex.findByLabelIdAll(labelWithDir.labelId)
+ def isDegree = builder.isDegree
- def isDegree = propsWithTs.containsKey(LabelMeta.degree.name)
+ def edgesWithIndex = builder.edgesWithIndex
- def propsPlusTsValid = propsWithTs.asScala.filter(kv => LabelMeta.isValidSeq(kv._2.labelMeta.seq)).asJava
-
- def edgesWithIndex = for (labelOrder <- labelOrders) yield {
- IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsWithTs, tsInnerValOpt = tsInnerValOpt)
- }
-
- def edgesWithIndexValid = for (labelOrder <- labelOrders) yield {
- IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsPlusTsValid, tsInnerValOpt = tsInnerValOpt)
- }
+ def edgesWithIndexValid = builder.edgesWithIndexValid
/** force direction as out on invertedEdge */
- def toSnapshotEdge: SnapshotEdge = {
- val (smaller, larger) = (srcForVertex, tgtForVertex)
-
- // val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, GraphUtil.directions("out"))
-
- propertyInner(LabelMeta.timestamp.name, ts, ts)
- val ret = SnapshotEdge(innerGraph, smaller, larger, innerLabel,
- GraphUtil.directions("out"), op, version, propsWithTs,
- pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt)
- ret
- }
-
- def defaultPropsWithName = Json.obj("from" -> srcVertex.innerId.toString(), "to" -> tgtVertex.innerId.toString(),
- "label" -> innerLabel.label, "service" -> innerLabel.serviceName)
-
- def propsWithName =
- for {
- (_, v) <- propsWithTs.asScala
- meta = v.labelMeta
- jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
- } yield meta.name -> jsValue
-
- def updateTgtVertex(id: InnerValLike) = {
- val newId = TargetVertexId(tgtVertex.id.column, id)
- val newTgtVertex = innerGraph.newVertex(newId, tgtVertex.ts, tgtVertex.props)
- S2Edge(innerGraph, srcVertex, newTgtVertex, innerLabel, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
- }
-
- def updateOriginalEdgeOpt(newOriginalEdgeOpt: S2EdgeLike): S2EdgeLike = {
- S2Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, propsWithTs, parentEdges,
- Option(newOriginalEdgeOpt), pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
- }
-
- def rank(r: RankParam): Double =
- if (r.keySeqAndWeights.size <= 0) 1.0f
- else {
- var sum: Double = 0
-
- for ((labelMeta, w) <- r.keySeqAndWeights) {
- if (propsWithTs.containsKey(labelMeta.name)) {
- val innerValWithTs = propsWithTs.get(labelMeta.name)
- val cost = try innerValWithTs.innerVal.toString.toDouble catch {
- case e: Exception =>
- logger.error("toInnerval failed in rank", e)
- 1.0
- }
- sum += w * cost
- }
- }
- sum
- }
+ def toSnapshotEdge: SnapshotEdge = SnapshotEdge.apply(this)
def checkProperty(key: String): Boolean = propsWithTs.containsKey(key)
- def copyEdge(srcVertex: S2VertexLike = srcVertex,
- tgtVertex: S2VertexLike = tgtVertex,
- innerLabel: Label = innerLabel,
- dir: Int = dir,
- op: Byte = op,
- version: Long = version,
- propsWithTs: State = S2Edge.propsToState(this.propsWithTs),
- parentEdges: Seq[EdgeWithScore] = parentEdges,
- originalEdgeOpt: Option[S2EdgeLike] = originalEdgeOpt,
- pendingEdgeOpt: Option[S2EdgeLike] = pendingEdgeOpt,
- statusCode: Byte = statusCode,
- lockTs: Option[Long] = lockTs,
- tsInnerValOpt: Option[InnerValLike] = tsInnerValOpt,
- ts: Long = ts): S2EdgeLike = {
- val edge = new S2Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, S2Edge.EmptyProps,
- parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
- S2Edge.fillPropsWithTs(edge, propsWithTs)
- edge.propertyInner(LabelMeta.timestamp.name, ts, ts)
- edge
- }
-
- def copyEdgeWithState(state: State, ts: Long): S2EdgeLike = {
- val newEdge = copy(propsWithTs = S2Edge.EmptyProps)
- S2Edge.fillPropsWithTs(newEdge, state)
- newEdge.propertyInner(LabelMeta.timestamp.name, ts, ts)
- newEdge
- }
-
def copyEdgeWithState(state: State): S2EdgeLike = {
- val newEdge = copy(propsWithTs = S2Edge.EmptyProps)
- S2Edge.fillPropsWithTs(newEdge, state)
- newEdge
+ builder.copyEdgeWithState(state)
}
def copyOp(newOp: Byte): S2EdgeLike = {
- copy(op = newOp)
+ builder.copyEdge(op = newOp)
}
def copyVersion(newVersion: Long): S2EdgeLike = {
- copy(version = newVersion)
+ builder.copyEdge(version = newVersion)
}
def copyParentEdges(parents: Seq[EdgeWithScore]): S2EdgeLike = {
- copy(parentEdges = parents)
+ builder.copyEdge(parentEdges = parents)
}
+ def copyOriginalEdgeOpt(newOriginalEdgeOpt: Option[S2EdgeLike]): S2EdgeLike =
+ builder.copyEdge(originalEdgeOpt = newOriginalEdgeOpt)
+
def copyStatusCode(newStatusCode: Byte): S2EdgeLike = {
- copy(statusCode = newStatusCode)
+ builder.copyEdge(statusCode = newStatusCode)
}
def copyLockTs(newLockTs: Option[Long]): S2EdgeLike = {
- copy(lockTs = newLockTs)
+ builder.copyEdge(lockTs = newLockTs)
}
def vertices(direction: Direction): util.Iterator[structure.Vertex] = {
@@ -289,19 +162,9 @@ trait S2EdgeLike extends Edge with GraphElement {
direction match {
case Direction.OUT =>
- // val newVertexId = this.direction match {
- // case "out" => VertexId(innerLabel.srcColumn, srcVertex.innerId)
- // case "in" => VertexId(innerLabel.tgtColumn, tgtVertex.innerId)
- // case _ => throw new IllegalArgumentException("direction can only be out/in.")
- // }
val newVertexId = edgeId.srcVertexId
innerGraph.getVertex(newVertexId).foreach(arr.add)
case Direction.IN =>
- // val newVertexId = this.direction match {
- // case "in" => VertexId(innerLabel.srcColumn, srcVertex.innerId)
- // case "out" => VertexId(innerLabel.tgtColumn, tgtVertex.innerId)
- // case _ => throw new IllegalArgumentException("direction can only be out/in.")
- // }
val newVertexId = edgeId.tgtVertexId
innerGraph.getVertex(newVertexId).foreach(arr.add)
case _ =>
@@ -331,39 +194,33 @@ trait S2EdgeLike extends Edge with GraphElement {
}
def property[V](key: String): Property[V] = {
- val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new java.lang.IllegalStateException(s"$key is not configured on Edge."))
if (propsWithTs.containsKey(key)) propsWithTs.get(key).asInstanceOf[Property[V]]
- else {
- Property.empty()
- // val default = innerLabel.metaPropsDefaultMapInner(labelMeta)
- // propertyInner(key, default.innerVal.value, default.ts).asInstanceOf[Property[V]]
- }
+ else Property.empty()
}
- // just for tinkerpop: save to storage, do not use for internal
def property[V](key: String, value: V): Property[V] = {
S2Property.assertValidProp(key, value)
val v = propertyInner(key, value, System.currentTimeMillis())
- val newTs = props.get(LabelMeta.timestamp.name).map(_.toString.toLong + 1).getOrElse(System.currentTimeMillis())
- val newEdge = this.copyEdge(ts = newTs)
+ val newTs =
+ if (propsWithTs.containsKey(LabelMeta.timestamp.name))
+ propsWithTs.get(LabelMeta.timestamp.name).innerVal.toString().toLong + 1
+ else
+ System.currentTimeMillis()
+
+ val newEdge = builder.copyEdge(ts = newTs)
Await.result(innerGraph.mutateEdges(Seq(newEdge), withWait = true), innerGraph.WaitTimeout)
v
}
- def propertyInner[V](key: String, value: V, ts: Long): Property[V] = {
- val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge."))
- val newProp = new S2Property[V](this, labelMeta, key, value, ts)
- propsWithTs.put(key, newProp)
- newProp
- }
+ def propertyInner[V](key: String, value: V, ts: Long): Property[V] = builder.propertyInner(key, value, ts)
def remove(): Unit = {
if (graph.features().edge().supportsRemoveEdges()) {
val requestTs = System.currentTimeMillis()
- val edgeToDelete = this.copyEdge(op = GraphUtil.operations("delete"),
+ val edgeToDelete = builder.copyEdge(op = GraphUtil.operations("delete"),
version = version + S2Edge.incrementVersion, propsWithTs = S2Edge.propsToState(updatePropsWithTs()), ts = requestTs)
// should we delete related edges also?
val future = innerGraph.mutateEdges(Seq(edgeToDelete), withWait = true)
@@ -376,43 +233,14 @@ trait S2EdgeLike extends Edge with GraphElement {
def graph(): Graph = innerGraph
- lazy val edgeId: EdgeId = {
- // NOTE: xxxForVertex makes direction to be "out"
- val timestamp = if (this.innerLabel.consistencyLevel == "strong") 0l else ts
- // EdgeId(srcVertex.innerId, tgtVertex.innerId, label(), "out", timestamp)
- val (srcColumn, tgtColumn) = innerLabel.srcTgtColumn(dir)
- if (direction == "out")
- EdgeId(VertexId(srcColumn, srcVertex.id.innerId), VertexId(tgtColumn, tgtVertex.id.innerId), label(), "out", timestamp)
- else
- EdgeId(VertexId(tgtColumn, tgtVertex.id.innerId), VertexId(srcColumn, srcVertex.id.innerId), label(), "out", timestamp)
- }
+ lazy val edgeId: EdgeId = builder.edgeId
def id(): AnyRef = edgeId
def label(): String = innerLabel.label
- private def toProps(): Map[String, Any] = {
- for {
- (labelMeta, defaultVal) <- innerLabel.metaPropsDefaultMapInner
- } yield {
- // labelMeta.name -> propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse(defaultVal).innerVal.value
- val value =
- if (propsWithTs.containsKey(labelMeta.name)) {
- propsWithTs.get(labelMeta.name).value
- } else {
- defaultVal.innerVal.value
- }
- labelMeta.name -> value
- }
- }
-
def toLogString: String = {
// val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj())
List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, propsWithTs).mkString("\t")
}
-
- private def getServiceColumn(vertex: S2VertexLike, defaultServiceColumn: ServiceColumn) =
- if (vertex.id.column == ServiceColumn.Default) defaultServiceColumn else vertex.id.column
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b5df1dd/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
index 28b15d0..1b9c94b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
@@ -121,7 +121,7 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) {
if where == WhereParser.success || where.filter(edge)
convertedEdge <- if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt)
} yield {
- val score = edge.rank(queryParam.rank)
+ val score = queryParam.rank.score(edge)
EdgeWithScore(convertedEdge, score, label)
}
StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = degreeEdges, cursors = lastCursor)
@@ -134,7 +134,7 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) {
if where == WhereParser.success || where.filter(edge)
convertedEdge <- if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt)
} yield {
- val edgeScore = edge.rank(queryParam.rank)
+ val edgeScore = queryParam.rank.score(edge)
val score = queryParam.scorePropagateOp match {
case "plus" => edgeScore + prevScore
case "divide" =>
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b5df1dd/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
index e2621af..79c9dc3 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
@@ -30,17 +30,17 @@ class MutationHelper(storage: Storage) {
val edge = edgeWithScore.edge
val score = edgeWithScore.score
- val edgeSnapshot = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
+ val edgeSnapshot = edge.builder.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
- val edgeForward = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
+ val edgeForward = edge.builder.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge =>
serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
io.buildIncrementsAsync(indexEdge, -1L)
}
/* reverted direction */
- val edgeRevert = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
+ val edgeRevert = edge.builder.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
io.buildIncrementsAsync(indexEdge, -1L)