You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2017/11/19 02:28:50 UTC
[07/23] incubator-s2graph git commit: add GraphElementBuilder.
add GraphElementBuilder.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/bc26642b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/bc26642b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/bc26642b
Branch: refs/heads/master
Commit: bc26642bc8c3c738205c8bc6c8239f7c0c78254b
Parents: 42b7702
Author: DO YUNG YOON <st...@apache.org>
Authored: Fri Nov 3 21:11:13 2017 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Fri Nov 3 21:11:13 2017 +0900
----------------------------------------------------------------------
.../s2graph/core/GraphElementBuilder.scala | 294 +++++++++++++++++++
1 file changed, 294 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bc26642b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
new file mode 100644
index 0000000..21179aa
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
@@ -0,0 +1,294 @@
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
+import org.apache.s2graph.core.JSONParser.{fromJsonToProperties, toInnerVal}
+import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.types._
+import org.apache.s2graph.core.utils.logger
+import play.api.libs.json.{JsObject, Json}
+
+import scala.util.Try
+
+class GraphElementBuilder(graph: S2Graph) {
+
+ def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try {
+ val parts = GraphUtil.split(s)
+ val logType = parts(2)
+ val element = if (logType == "edge" | logType == "e") {
+ /* current only edge is considered to be bulk loaded */
+ labelMapping.get(parts(5)) match {
+ case None =>
+ case Some(toReplace) =>
+ parts(5) = toReplace
+ }
+ toEdge(parts)
+ } else if (logType == "vertex" | logType == "v") {
+ toVertex(parts)
+ } else {
+ throw new GraphExceptions.JsonParseException("log type is not exist in log.")
+ }
+
+ element
+ } recover {
+ case e: Exception =>
+ logger.error(s"[toElement]: $e", e)
+ None
+ } get
+
+
+ def toVertex(s: String): Option[S2VertexLike] = {
+ toVertex(GraphUtil.split(s))
+ }
+
+ def toEdge(s: String): Option[S2EdgeLike] = {
+ toEdge(GraphUtil.split(s))
+ }
+
+ def toEdge(parts: Array[String]): Option[S2EdgeLike] = Try {
+ val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
+ val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
+ val tempDirection = if (parts.length >= 8) parts(7) else "out"
+ val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection
+ val edge = toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation)
+ Option(edge)
+ } recover {
+ case e: Exception =>
+ logger.error(s"[toEdge]: $e", e)
+ throw e
+ } get
+
+ def toVertex(parts: Array[String]): Option[S2VertexLike] = Try {
+ val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
+ val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
+ val vertex = toVertex(serviceName, colName, srcId, props, ts.toLong, operation)
+ Option(vertex)
+ } recover {
+ case e: Throwable =>
+ logger.error(s"[toVertex]: $e", e)
+ throw e
+ } get
+
+ def toEdge(srcId: Any,
+ tgtId: Any,
+ labelName: String,
+ direction: String,
+ props: Map[String, Any] = Map.empty,
+ ts: Long = System.currentTimeMillis(),
+ operation: String = "insert"): S2EdgeLike = {
+ val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
+
+ val srcColumn = if (direction == "out") label.srcColumn else label.tgtColumn
+ val tgtColumn = if (direction == "out") label.tgtColumn else label.srcColumn
+
+ val srcVertexIdInnerVal = toInnerVal(srcId, srcColumn.columnType, label.schemaVersion)
+ val tgtVertexIdInnerVal = toInnerVal(tgtId, tgtColumn.columnType, label.schemaVersion)
+
+ val srcVertex = newVertex(new SourceVertexId(srcColumn, srcVertexIdInnerVal), System.currentTimeMillis())
+ val tgtVertex = newVertex(new TargetVertexId(tgtColumn, tgtVertexIdInnerVal), System.currentTimeMillis())
+ val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
+
+ val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts)
+ val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
+ val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
+
+ new S2Edge(graph, srcVertex, tgtVertex, label, dir, op = op, version = ts).copyEdgeWithState(propsWithTs)
+ }
+
+ def toVertex(serviceName: String,
+ columnName: String,
+ id: Any,
+ props: Map[String, Any] = Map.empty,
+ ts: Long = System.currentTimeMillis(),
+ operation: String = "insert"): S2VertexLike = {
+
+ val service = Service.findByName(serviceName).getOrElse(throw new java.lang.IllegalArgumentException(s"$serviceName is not found."))
+ val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new java.lang.IllegalArgumentException(s"$columnName is not found."))
+ val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
+
+ val srcVertexId = id match {
+ case vid: VertexId => id.asInstanceOf[VertexId]
+ case _ => new VertexId(column, toInnerVal(id, column.columnType, column.schemaVersion))
+ }
+
+ val propsInner = column.propsToInnerVals(props) ++
+ Map(ColumnMeta.timestamp -> InnerVal.withLong(ts, column.schemaVersion))
+
+ val vertex = new S2Vertex(graph, srcVertexId, ts, S2Vertex.EmptyProps, op)
+ S2Vertex.fillPropsWithTs(vertex, propsInner)
+ vertex
+ }
+
+
+ /**
+ * helper to create new Edge instance from given parameters on memory(not actually stored in storage).
+ *
+ * Since we are using mutable map for property value(propsWithTs),
+ * we should make sure that reference for mutable map never be shared between multiple Edge instances.
+ * To guarantee this, we never create Edge directly, but rather use this helper which is available on S2Graph.
+ *
+ * Note that we are using following convention
+ * 1. `add*` for method that actually store instance into storage,
+ * 2. `new*` for method that only create instance on memory, but not store it into storage.
+ *
+ * @param srcVertex
+ * @param tgtVertex
+ * @param innerLabel
+ * @param dir
+ * @param op
+ * @param version
+ * @param propsWithTs
+ * @param parentEdges
+ * @param originalEdgeOpt
+ * @param pendingEdgeOpt
+ * @param statusCode
+ * @param lockTs
+ * @param tsInnerValOpt
+ * @return
+ */
+ def newEdge(srcVertex: S2VertexLike,
+ tgtVertex: S2VertexLike,
+ innerLabel: Label,
+ dir: Int,
+ op: Byte = GraphUtil.defaultOpByte,
+ version: Long = System.currentTimeMillis(),
+ propsWithTs: S2Edge.State,
+ parentEdges: Seq[EdgeWithScore] = Nil,
+ originalEdgeOpt: Option[S2EdgeLike] = None,
+ pendingEdgeOpt: Option[S2EdgeLike] = None,
+ statusCode: Byte = 0,
+ lockTs: Option[Long] = None,
+ tsInnerValOpt: Option[InnerValLike] = None): S2EdgeLike = {
+ val edge = S2Edge(
+ graph,
+ srcVertex,
+ tgtVertex,
+ innerLabel,
+ dir,
+ op,
+ version,
+ S2Edge.EmptyProps,
+ parentEdges,
+ originalEdgeOpt,
+ pendingEdgeOpt,
+ statusCode,
+ lockTs,
+ tsInnerValOpt)
+ S2Edge.fillPropsWithTs(edge, propsWithTs)
+ edge
+ }
+
+ /**
+ * helper to create new SnapshotEdge instance from given parameters on memory(not actually stored in storage).
+ *
+ * Note that this is only available to S2Graph, not structure.Graph so only internal code should use this method.
+ * @param srcVertex
+ * @param tgtVertex
+ * @param label
+ * @param dir
+ * @param op
+ * @param version
+ * @param propsWithTs
+ * @param pendingEdgeOpt
+ * @param statusCode
+ * @param lockTs
+ * @param tsInnerValOpt
+ * @return
+ */
+ private[core] def newSnapshotEdge(srcVertex: S2VertexLike,
+ tgtVertex: S2VertexLike,
+ label: Label,
+ dir: Int,
+ op: Byte,
+ version: Long,
+ propsWithTs: S2Edge.State,
+ pendingEdgeOpt: Option[S2EdgeLike],
+ statusCode: Byte = 0,
+ lockTs: Option[Long],
+ tsInnerValOpt: Option[InnerValLike] = None): SnapshotEdge = {
+ val snapshotEdge = new SnapshotEdge(graph, srcVertex, tgtVertex, label, dir, op, version, S2Edge.EmptyProps,
+ pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
+ S2Edge.fillPropsWithTs(snapshotEdge, propsWithTs)
+ snapshotEdge
+ }
+
+ def newVertexId(serviceName: String)(columnName: String)(id: Any): VertexId = {
+ val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found."))
+ val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found."))
+ newVertexId(service, column, id)
+ }
+
+ /**
+ * helper to create S2Graph's internal S2VertexId instance with given parameters.
+ * @param service
+ * @param column
+ * @param id
+ * @return
+ */
+ def newVertexId(service: Service,
+ column: ServiceColumn,
+ id: Any): VertexId = {
+ val innerVal = CanInnerValLike.anyToInnerValLike.toInnerVal(id)(column.schemaVersion)
+ new VertexId(column, innerVal)
+ }
+
+ def newVertex(id: VertexId,
+ ts: Long = System.currentTimeMillis(),
+ props: S2Vertex.Props = S2Vertex.EmptyProps,
+ op: Byte = 0,
+ belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike = {
+ val vertex = new S2Vertex(graph, id, ts, S2Vertex.EmptyProps, op, belongLabelIds)
+ S2Vertex.fillPropsWithTs(vertex, props)
+ vertex
+ }
+
+ def toRequestEdge(queryRequest: QueryRequest, parentEdges: Seq[EdgeWithScore]): S2EdgeLike = {
+ val srcVertex = queryRequest.vertex
+ val queryParam = queryRequest.queryParam
+ val tgtVertexIdOpt = queryParam.tgtVertexInnerIdOpt
+ val label = queryParam.label
+ val labelWithDir = queryParam.labelWithDir
+ val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir)
+ val propsWithTs = label.EmptyPropsWithTs
+
+ tgtVertexIdOpt match {
+ case Some(tgtVertexId) => // _to is given.
+ /* we use toSnapshotEdge so dont need to swap src, tgt */
+ val src = srcVertex.innerId
+ val tgt = tgtVertexId
+ val (srcVId, tgtVId) = (new SourceVertexId(srcColumn, src), new TargetVertexId(tgtColumn, tgt))
+ val (srcV, tgtV) = (newVertex(srcVId), newVertex(tgtVId))
+
+ newEdge(srcV, tgtV, label, labelWithDir.dir, propsWithTs = propsWithTs)
+ case None =>
+ val src = srcVertex.innerId
+ val srcVId = new SourceVertexId(srcColumn, src)
+ val srcV = newVertex(srcVId)
+
+ newEdge(srcV, srcV, label, labelWithDir.dir, propsWithTs = propsWithTs, parentEdges = parentEdges)
+ }
+ }
+
+ def buildEdgesToDelete(edgeWithScoreLs: Seq[EdgeWithScore], requestTs: Long): Seq[EdgeWithScore] = {
+ if (edgeWithScoreLs.isEmpty) Nil
+ else {
+ val head = edgeWithScoreLs.head
+ val label = head.edge.innerLabel
+
+ //Degree edge?
+ edgeWithScoreLs.map { case edgeWithScore =>
+ val edge = edgeWithScore.edge
+ val copiedEdge = label.consistencyLevel match {
+ case "strong" =>
+ edge.copyEdge(op = GraphUtil.operations("delete"),
+ version = requestTs, propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs)
+ case _ =>
+ edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs)
+ }
+
+ val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
+ // logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}")
+ edgeToDelete
+ }
+ }
+ }
+}