You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/03/15 10:28:34 UTC
[18/27] incubator-s2graph git commit: [S2GRAPH-57]: Change package
names into org.apahce.s2graph.
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
new file mode 100644
index 0000000..c1a5738
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
@@ -0,0 +1,567 @@
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
+import org.apache.s2graph.core.types._
+import org.apache.s2graph.core.utils.logger
+import play.api.libs.json.{JsNumber, Json}
+
+import scala.collection.JavaConversions._
+import scala.util.hashing.MurmurHash3
+
+
+case class SnapshotEdge(srcVertex: Vertex,
+ tgtVertex: Vertex,
+ labelWithDir: LabelWithDirection,
+ op: Byte,
+ version: Long,
+ props: Map[Byte, InnerValLikeWithTs],
+ pendingEdgeOpt: Option[Edge],
+ statusCode: Byte = 0,
+ lockTs: Option[Long]) extends JSONParser {
+
+ if (!props.containsKey(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.")
+
+ val label = Label.findById(labelWithDir.labelId)
+ val schemaVer = label.schemaVersion
+ lazy val propsWithoutTs = props.mapValues(_.innerVal)
+ val ts = props(LabelMeta.timeStampSeq).innerVal.toString().toLong
+
+ def toEdge: Edge = {
+ val ts = props.get(LabelMeta.timeStampSeq).map(v => v.ts).getOrElse(version)
+ Edge(srcVertex, tgtVertex, labelWithDir, op,
+ version, props, pendingEdgeOpt = pendingEdgeOpt,
+ statusCode = statusCode, lockTs = lockTs)
+ }
+
+ def propsWithName = (for {
+ (seq, v) <- props
+ meta <- label.metaPropsMap.get(seq)
+ jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
+ } yield meta.name -> jsValue) ++ Map("version" -> JsNumber(version))
+
+ // only for debug
+ def toLogString() = {
+ List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, propsWithName).mkString("\t")
+ }
+}
+
+case class IndexEdge(srcVertex: Vertex,
+ tgtVertex: Vertex,
+ labelWithDir: LabelWithDirection,
+ op: Byte,
+ version: Long,
+ labelIndexSeq: Byte,
+ props: Map[Byte, InnerValLike]) extends JSONParser {
+ if (!props.containsKey(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.")
+ // assert(props.containsKey(LabelMeta.timeStampSeq))
+
+ val ts = props(LabelMeta.timeStampSeq).toString.toLong
+ val degreeEdge = props.contains(LabelMeta.degreeSeq)
+ lazy val label = Label.findById(labelWithDir.labelId)
+ val schemaVer = label.schemaVersion
+ lazy val labelIndex = LabelIndex.findByLabelIdAndSeq(labelWithDir.labelId, labelIndexSeq).get
+ lazy val defaultIndexMetas = labelIndex.sortKeyTypes.map { meta =>
+ val innerVal = toInnerVal(meta.defaultValue, meta.dataType, schemaVer)
+ meta.seq -> innerVal
+ }.toMap
+
+ lazy val labelIndexMetaSeqs = labelIndex.metaSeqs
+
+ /** TODO: make sure call of this class fill props as this assumes */
+ lazy val orders = for (k <- labelIndexMetaSeqs) yield {
+ props.get(k) match {
+ case None =>
+
+ /**
+ * TODO: agly hack
+ * now we double store target vertex.innerId/srcVertex.innerId for easy development. later fix this to only store id once
+ */
+ val v = k match {
+ case LabelMeta.timeStampSeq => InnerVal.withLong(version, schemaVer)
+ case LabelMeta.toSeq => tgtVertex.innerId
+ case LabelMeta.fromSeq => //srcVertex.innerId
+ // for now, it does not make sense to build index on srcVertex.innerId since all edges have same data.
+ throw new RuntimeException("_from on indexProps is not supported")
+ case _ => defaultIndexMetas(k)
+ }
+
+ k -> v
+ case Some(v) => k -> v
+ }
+ }
+
+ lazy val ordersKeyMap = orders.map { case (byte, _) => byte }.toSet
+ lazy val metas = for ((k, v) <- props if !ordersKeyMap.contains(k)) yield k -> v
+
+ lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) }
+
+ //TODO:
+ // lazy val kvs = Graph.client.indexedEdgeSerializer(this).toKeyValues.toList
+
+ lazy val hasAllPropsForIndex = orders.length == labelIndexMetaSeqs.length
+
+ def propsWithName = for {
+ (seq, v) <- props
+ meta <- label.metaPropsMap.get(seq) if seq >= 0
+ jsValue <- innerValToJsValue(v, meta.dataType)
+ } yield meta.name -> jsValue
+
+
+ def toEdge: Edge = Edge(srcVertex, tgtVertex, labelWithDir, op, version, propsWithTs)
+
+ // only for debug
+ def toLogString() = {
+ List(version, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, Json.toJson(propsWithName)).mkString("\t")
+ }
+}
+
+case class Edge(srcVertex: Vertex,
+ tgtVertex: Vertex,
+ labelWithDir: LabelWithDirection,
+ op: Byte = GraphUtil.defaultOpByte,
+ version: Long = System.currentTimeMillis(),
+ propsWithTs: Map[Byte, InnerValLikeWithTs],
+ parentEdges: Seq[EdgeWithScore] = Nil,
+ originalEdgeOpt: Option[Edge] = None,
+ pendingEdgeOpt: Option[Edge] = None,
+ statusCode: Byte = 0,
+ lockTs: Option[Long] = None) extends GraphElement with JSONParser {
+
+ if (!props.containsKey(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.")
+ // assert(propsWithTs.containsKey(LabelMeta.timeStampSeq))
+ val schemaVer = label.schemaVersion
+ val ts = propsWithTs(LabelMeta.timeStampSeq).innerVal.toString.toLong
+
+ def props = propsWithTs.mapValues(_.innerVal)
+
+ def relatedEdges = {
+ if (labelWithDir.isDirected) List(this, duplicateEdge)
+ else {
+ val outDir = labelWithDir.copy(dir = GraphUtil.directions("out"))
+ val base = copy(labelWithDir = outDir)
+ List(base, base.reverseSrcTgtEdge)
+ }
+ }
+
+ // def relatedEdges = List(this)
+
+ def srcForVertex = {
+ val belongLabelIds = Seq(labelWithDir.labelId)
+ if (labelWithDir.dir == GraphUtil.directions("in")) {
+ Vertex(VertexId(label.tgtColumn.id.get, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
+ } else {
+ Vertex(VertexId(label.srcColumn.id.get, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
+ }
+ }
+
+ def tgtForVertex = {
+ val belongLabelIds = Seq(labelWithDir.labelId)
+ if (labelWithDir.dir == GraphUtil.directions("in")) {
+ Vertex(VertexId(label.srcColumn.id.get, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
+ } else {
+ Vertex(VertexId(label.tgtColumn.id.get, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
+ }
+ }
+
+ def duplicateEdge = reverseSrcTgtEdge.reverseDirEdge
+
+ def reverseDirEdge = copy(labelWithDir = labelWithDir.dirToggled)
+
+ def reverseSrcTgtEdge = copy(srcVertex = tgtVertex, tgtVertex = srcVertex)
+
+ def label = Label.findById(labelWithDir.labelId)
+
+ def labelOrders = LabelIndex.findByLabelIdAll(labelWithDir.labelId)
+
+ override def serviceName = label.serviceName
+
+ override def queueKey = Seq(ts.toString, tgtVertex.serviceName).mkString("|")
+
+ override def queuePartitionKey = Seq(srcVertex.innerId, tgtVertex.innerId).mkString("|")
+
+ override def isAsync = label.isAsync
+
+ def isDegree = propsWithTs.contains(LabelMeta.degreeSeq)
+
+ def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match {
+ case Some(_) => props
+ case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer))
+ }
+
+ def propsPlusTsValid = propsPlusTs.filter(kv => kv._1 >= 0)
+
+ def edgesWithIndex = for (labelOrder <- labelOrders) yield {
+ IndexEdge(srcVertex, tgtVertex, labelWithDir, op, version, labelOrder.seq, propsPlusTs)
+ }
+
+ def edgesWithIndexValid = for (labelOrder <- labelOrders) yield {
+ IndexEdge(srcVertex, tgtVertex, labelWithDir, op, version, labelOrder.seq, propsPlusTsValid)
+ }
+
+ /** force direction as out on invertedEdge */
+ def toSnapshotEdge: SnapshotEdge = {
+ val (smaller, larger) = (srcForVertex, tgtForVertex)
+
+ val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, GraphUtil.directions("out"))
+
+ val ret = SnapshotEdge(smaller, larger, newLabelWithDir, op, version,
+ Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(ts, schemaVer), ts)) ++ propsWithTs,
+ pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs)
+ ret
+ }
+
+ override def hashCode(): Int = {
+ MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId)
+ }
+
+ override def equals(other: Any): Boolean = other match {
+ case e: Edge =>
+ srcVertex.innerId == e.srcVertex.innerId &&
+ tgtVertex.innerId == e.tgtVertex.innerId &&
+ labelWithDir == e.labelWithDir
+ case _ => false
+ }
+
+ def propsWithName = for {
+ (seq, v) <- props
+ meta <- label.metaPropsMap.get(seq) if seq > 0
+ jsValue <- innerValToJsValue(v, meta.dataType)
+ } yield meta.name -> jsValue
+
+ def updateTgtVertex(id: InnerValLike) = {
+ val newId = TargetVertexId(tgtVertex.id.colId, id)
+ val newTgtVertex = Vertex(newId, tgtVertex.ts, tgtVertex.props)
+ Edge(srcVertex, newTgtVertex, labelWithDir, op, version, propsWithTs)
+ }
+
+ def rank(r: RankParam): Double =
+ if (r.keySeqAndWeights.size <= 0) 1.0f
+ else {
+ var sum: Double = 0
+
+ for ((seq, w) <- r.keySeqAndWeights) {
+ seq match {
+ case LabelMeta.countSeq => sum += 1
+ case _ => {
+ propsWithTs.get(seq) match {
+ case None => // do nothing
+ case Some(innerValWithTs) => {
+ val cost = try innerValWithTs.innerVal.toString.toDouble catch {
+ case e: Exception =>
+ logger.error("toInnerval failed in rank", e)
+ 1.0
+ }
+ sum += w * cost
+ }
+ }
+ }
+ }
+ }
+ sum
+ }
+
+ def toLogString: String = {
+ val ret =
+ if (propsWithName.nonEmpty)
+ List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, Json.toJson(propsWithName))
+ else
+ List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label)
+
+ ret.mkString("\t")
+ }
+}
+
+case class EdgeMutate(edgesToDelete: List[IndexEdge] = List.empty[IndexEdge],
+ edgesToInsert: List[IndexEdge] = List.empty[IndexEdge],
+ newSnapshotEdge: Option[SnapshotEdge] = None) {
+
+ def toLogString: String = {
+ val l = (0 until 50).map(_ => "-").mkString("")
+ val deletes = s"deletes: ${edgesToDelete.map(e => e.toLogString).mkString("\n")}"
+ val inserts = s"inserts: ${edgesToInsert.map(e => e.toLogString).mkString("\n")}"
+ val updates = s"snapshot: ${newSnapshotEdge.map(e => e.toLogString).mkString("\n")}"
+
+ List("\n", l, deletes, inserts, updates, l, "\n").mkString("\n")
+ }
+}
+
+object Edge extends JSONParser {
+ val incrementVersion = 1L
+ val minTsVal = 0L
+
+ /** now version information is required also **/
+ type State = Map[Byte, InnerValLikeWithTs]
+ type PropsPairWithTs = (State, State, Long, String)
+ type MergeState = PropsPairWithTs => (State, Boolean)
+ type UpdateFunc = (Option[Edge], Edge, MergeState)
+
+ def allPropsDeleted(props: Map[Byte, InnerValLikeWithTs]): Boolean =
+ if (!props.containsKey(LabelMeta.lastDeletedAt)) false
+ else {
+ val lastDeletedAt = props.get(LabelMeta.lastDeletedAt).get.ts
+ val propsWithoutLastDeletedAt = props - LabelMeta.lastDeletedAt
+
+ propsWithoutLastDeletedAt.forall { case (_, v) => v.ts <= lastDeletedAt }
+ }
+
+ def buildDeleteBulk(invertedEdge: Option[Edge], requestEdge: Edge): (Edge, EdgeMutate) = {
+ // assert(invertedEdge.isEmpty)
+ // assert(requestEdge.op == GraphUtil.operations("delete"))
+
+ val edgesToDelete = requestEdge.relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
+ val edgeInverted = Option(requestEdge.toSnapshotEdge)
+
+ (requestEdge, EdgeMutate(edgesToDelete, edgesToInsert = Nil, edgeInverted))
+ }
+
+ def buildOperation(invertedEdge: Option[Edge], requestEdges: Seq[Edge]): (Edge, EdgeMutate) = {
+ // logger.debug(s"oldEdge: ${invertedEdge.map(_.toStringRaw)}")
+ // logger.debug(s"requestEdge: ${requestEdge.toStringRaw}")
+ val oldPropsWithTs =
+ if (invertedEdge.isEmpty) Map.empty[Byte, InnerValLikeWithTs] else invertedEdge.get.propsWithTs
+
+ val funcs = requestEdges.map { edge =>
+ if (edge.op == GraphUtil.operations("insert")) {
+ edge.label.consistencyLevel match {
+ case "strong" => Edge.mergeUpsert _
+ case _ => Edge.mergeInsertBulk _
+ }
+ } else if (edge.op == GraphUtil.operations("insertBulk")) {
+ Edge.mergeInsertBulk _
+ } else if (edge.op == GraphUtil.operations("delete")) {
+ edge.label.consistencyLevel match {
+ case "strong" => Edge.mergeDelete _
+ case _ => throw new RuntimeException("not supported")
+ }
+ }
+ else if (edge.op == GraphUtil.operations("update")) Edge.mergeUpdate _
+ else if (edge.op == GraphUtil.operations("increment")) Edge.mergeIncrement _
+ else throw new RuntimeException(s"not supported operation on edge: $edge")
+ }
+
+ val oldTs = invertedEdge.map(_.ts).getOrElse(minTsVal)
+ val requestWithFuncs = requestEdges.zip(funcs).filter(oldTs != _._1.ts).sortBy(_._1.ts)
+
+ if (requestWithFuncs.isEmpty) {
+ (requestEdges.head, EdgeMutate())
+ } else {
+ val requestEdge = requestWithFuncs.last._1
+ var prevPropsWithTs = oldPropsWithTs
+
+ for {
+ (requestEdge, func) <- requestWithFuncs
+ } {
+ val (_newPropsWithTs, _) = func(prevPropsWithTs, requestEdge.propsWithTs, requestEdge.ts, requestEdge.schemaVer)
+ prevPropsWithTs = _newPropsWithTs
+ // logger.debug(s"${requestEdge.toLogString}\n$oldPropsWithTs\n$prevPropsWithTs\n")
+ }
+ val requestTs = requestEdge.ts
+ /** version should be monotoniously increasing so our RPC mutation should be applied safely */
+ val newVersion = invertedEdge.map(e => e.version + incrementVersion).getOrElse(requestTs)
+ val maxTs = prevPropsWithTs.map(_._2.ts).max
+ val newTs = if (maxTs > requestTs) maxTs else requestTs
+ val propsWithTs = prevPropsWithTs ++
+ Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(newTs, requestEdge.label.schemaVersion), newTs))
+ val edgeMutate = buildMutation(invertedEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs)
+
+ // logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}")
+ // logger.error(s"$propsWithTs")
+ (requestEdge, edgeMutate)
+ }
+ }
+
+ def buildMutation(snapshotEdgeOpt: Option[Edge],
+ requestEdge: Edge,
+ newVersion: Long,
+ oldPropsWithTs: Map[Byte, InnerValLikeWithTs],
+ newPropsWithTs: Map[Byte, InnerValLikeWithTs]): EdgeMutate = {
+ if (oldPropsWithTs == newPropsWithTs) {
+ // all requests should be dropped. so empty mutation.
+ // logger.error(s"Case 1")
+ EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = None)
+ } else {
+ val withOutDeletedAt = newPropsWithTs.filter(kv => kv._1 != LabelMeta.lastDeletedAt)
+ val newOp = snapshotEdgeOpt match {
+ case None => requestEdge.op
+ case Some(old) =>
+ val oldMaxTs = old.propsWithTs.map(_._2.ts).max
+ if (oldMaxTs > requestEdge.ts) old.op
+ else requestEdge.op
+ }
+
+ val newSnapshotEdgeOpt =
+ Option(requestEdge.copy(op = newOp, propsWithTs = newPropsWithTs, version = newVersion).toSnapshotEdge)
+ // delete request must always update snapshot.
+ if (withOutDeletedAt == oldPropsWithTs && newPropsWithTs.containsKey(LabelMeta.lastDeletedAt)) {
+ // no mutation on indexEdges. only snapshotEdge should be updated to record lastDeletedAt.
+ // logger.error(s"Case 2")
+ EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = newSnapshotEdgeOpt)
+ } else {
+ // logger.error(s"Case 3")
+ val edgesToDelete = snapshotEdgeOpt match {
+ case Some(snapshotEdge) if snapshotEdge.op != GraphUtil.operations("delete") =>
+ snapshotEdge.copy(op = GraphUtil.defaultOpByte).
+ relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
+ case _ => Nil
+ }
+
+ val edgesToInsert =
+ if (newPropsWithTs.isEmpty || allPropsDeleted(newPropsWithTs)) Nil
+ else
+ requestEdge.copy(version = newVersion, propsWithTs = newPropsWithTs, op = GraphUtil.defaultOpByte).
+ relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
+
+ EdgeMutate(edgesToDelete = edgesToDelete, edgesToInsert = edgesToInsert, newSnapshotEdge = newSnapshotEdgeOpt)
+ }
+ }
+ }
+
+ def mergeUpsert(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
+ var shouldReplace = false
+ val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs
+ val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal)
+ val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
+ propsWithTs.get(k) match {
+ case Some(newValWithTs) =>
+ assert(oldValWithTs.ts >= lastDeletedAt)
+ val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs
+ else {
+ shouldReplace = true
+ newValWithTs
+ }
+ Some(k -> v)
+
+ case None =>
+ assert(oldValWithTs.ts >= lastDeletedAt)
+ if (oldValWithTs.ts >= requestTs || k < 0) Some(k -> oldValWithTs)
+ else {
+ shouldReplace = true
+ None
+ }
+ }
+ }
+ val existInNew =
+ for {
+ (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt
+ } yield {
+ shouldReplace = true
+ Some(k -> newValWithTs)
+ }
+
+ ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace)
+ }
+
+ def mergeUpdate(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
+ var shouldReplace = false
+ val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs
+ val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal)
+ val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
+ propsWithTs.get(k) match {
+ case Some(newValWithTs) =>
+ assert(oldValWithTs.ts >= lastDeletedAt)
+ val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs
+ else {
+ shouldReplace = true
+ newValWithTs
+ }
+ Some(k -> v)
+ case None =>
+ // important: update need to merge previous valid values.
+ assert(oldValWithTs.ts >= lastDeletedAt)
+ Some(k -> oldValWithTs)
+ }
+ }
+ val existInNew = for {
+ (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt
+ } yield {
+ shouldReplace = true
+ Some(k -> newValWithTs)
+ }
+
+ ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace)
+ }
+
+ def mergeIncrement(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
+ var shouldReplace = false
+ val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs
+ val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal)
+ val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
+ propsWithTs.get(k) match {
+ case Some(newValWithTs) =>
+ if (k == LabelMeta.timeStampSeq) {
+ val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs
+ else {
+ shouldReplace = true
+ newValWithTs
+ }
+ Some(k -> v)
+ } else {
+ if (oldValWithTs.ts >= newValWithTs.ts) {
+ Some(k -> oldValWithTs)
+ } else {
+ assert(oldValWithTs.ts < newValWithTs.ts && oldValWithTs.ts >= lastDeletedAt)
+ shouldReplace = true
+ // incr(t0), incr(t2), d(t1) => deleted
+ Some(k -> InnerValLikeWithTs(oldValWithTs.innerVal + newValWithTs.innerVal, oldValWithTs.ts))
+ }
+ }
+
+ case None =>
+ assert(oldValWithTs.ts >= lastDeletedAt)
+ Some(k -> oldValWithTs)
+ // if (oldValWithTs.ts >= lastDeletedAt) Some(k -> oldValWithTs) else None
+ }
+ }
+ val existInNew = for {
+ (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt
+ } yield {
+ shouldReplace = true
+ Some(k -> newValWithTs)
+ }
+
+ ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace)
+ }
+
+ def mergeDelete(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
+ var shouldReplace = false
+ val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs
+ val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt) match {
+ case Some(prevDeletedAt) =>
+ if (prevDeletedAt.ts >= requestTs) prevDeletedAt.ts
+ else {
+ shouldReplace = true
+ requestTs
+ }
+ case None => {
+ shouldReplace = true
+ requestTs
+ }
+ }
+ val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
+ if (k == LabelMeta.timeStampSeq) {
+ if (oldValWithTs.ts >= requestTs) Some(k -> oldValWithTs)
+ else {
+ shouldReplace = true
+ Some(k -> InnerValLikeWithTs.withLong(requestTs, requestTs, version))
+ }
+ } else {
+ if (oldValWithTs.ts >= lastDeletedAt) Some(k -> oldValWithTs)
+ else {
+ shouldReplace = true
+ None
+ }
+ }
+ }
+ val mustExistInNew = Map(LabelMeta.lastDeletedAt -> InnerValLikeWithTs.withLong(lastDeletedAt, lastDeletedAt, version))
+ ((existInOld.flatten ++ mustExistInNew).toMap, shouldReplace)
+ }
+
+ def mergeInsertBulk(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
+ val (_, propsWithTs, _, _) = propsPairWithTs
+ (propsWithTs, true)
+ }
+
+ def fromString(s: String): Option[Edge] = Graph.toEdge(s)
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
new file mode 100644
index 0000000..d3177b8
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
@@ -0,0 +1,159 @@
+package org.apache.s2graph.core
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicLong
+
+import akka.actor._
+import akka.routing.{Broadcast, RoundRobinPool}
+import com.typesafe.config.Config
+import org.apache.kafka.clients.producer._
+
+import scala.concurrent.duration._
+
+object ExceptionHandler {
+
+ var producer: Option[Producer[Key, Val]] = None
+ var properties: Option[Properties] = None
+ val numOfRoutees = 1
+ val actorSystem = ActorSystem("ExceptionHandler")
+ var routees: Option[ActorRef] = None
+ var shutdownTime = 1000 millis
+ var phase = "dev"
+ lazy val failTopic = s"mutateFailed_${phase}"
+
+ def apply(config: Config) = {
+ properties =
+ if (config.hasPath("kafka.metadata.broker.list")) Option(kafkaConfig(config))
+ else None
+ phase = if (config.hasPath("phase")) config.getString("phase") else "dev"
+ producer = for {
+ props <- properties
+ p <- try {
+ Option(new KafkaProducer[Key, Val](props))
+ } catch {
+ case e: Throwable => None
+ }
+ } yield {
+ p
+ }
+ init()
+ }
+
+ def props(producer: Producer[Key, Val]) = Props(classOf[KafkaAggregatorActor], producer)
+
+ def init() = {
+ for {
+ p <- producer
+ } {
+ routees = Option(actorSystem.actorOf(RoundRobinPool(numOfRoutees).props(props(p))))
+ }
+ }
+
+ def shutdown() = {
+ routees.map(_ ! Broadcast(PoisonPill))
+ Thread.sleep(shutdownTime.length)
+ }
+
+ def enqueues(msgs: Seq[KafkaMessage]) = {
+ msgs.foreach(enqueue)
+ }
+
+ def enqueue(msg: KafkaMessage) = {
+ routees.map(_ ! msg)
+ }
+
+
+ def kafkaConfig(config: Config) = {
+ val props = new Properties();
+
+ /** all default configuration for new producer */
+ val brokers =
+ if (config.hasPath("kafka.metadata.broker.list")) config.getString("kafka.metadata.broker.list")
+ else "localhost"
+ props.put("bootstrap.servers", brokers)
+ props.put("acks", "1")
+ props.put("buffer.memory", "33554432")
+ props.put("compression.type", "snappy")
+ props.put("retries", "0")
+ props.put("batch.size", "16384")
+ props.put("linger.ms", "0")
+ props.put("max.request.size", "1048576")
+ props.put("receive.buffer.bytes", "32768")
+ props.put("send.buffer.bytes", "131072")
+ props.put("timeout.ms", "30000")
+ props.put("block.on.buffer.full", "false")
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
+ props
+ }
+
+ type Key = String
+ type Val = String
+
+ def toKafkaMessage(topic: String = failTopic, element: GraphElement, originalString: Option[String] = None) = {
+ KafkaMessage(new ProducerRecord[Key, Val](topic, element.queuePartitionKey,
+ originalString.getOrElse(element.toLogString())))
+ }
+
+ case class KafkaMessage(msg: ProducerRecord[Key, Val])
+
+ case class Message(topic: String, msg: String)
+
+ case class BufferedKafkaMessage(msgs: Seq[ProducerRecord[Key, Val]], bufferSize: Int)
+
+ case class BufferedMessage(topic: String, bufferedMsgs: String, bufferSize: Int)
+
+ case object FlushBuffer
+
+ case class UpdateHealth(isHealty: Boolean)
+
+ case object ShowMetrics
+
+}
+
+class KafkaAggregatorActor(kafkaProducer: Producer[String, String]) extends Stash with ActorLogging {
+
+ import ExceptionHandler._
+
+ val failedCount = new AtomicLong(0L)
+ val successCount = new AtomicLong(0L)
+ val stashCount = new AtomicLong(0L)
+
+ implicit val ex = context.system.dispatcher
+
+ context.system.scheduler.schedule(0 millis, 10 seconds) {
+ self ! ShowMetrics
+ }
+
+ override def receive = {
+ case ShowMetrics =>
+ log.info(s"[Stats]: failed[${failedCount.get}], stashed[${stashCount.get}], success[${successCount.get}]")
+
+ case m: KafkaMessage =>
+ val replayTo = self
+ try {
+ kafkaProducer.send(m.msg, new Callback() {
+ override def onCompletion(meta: RecordMetadata, e: Exception) = {
+ if (e == null) {
+ // success
+ successCount.incrementAndGet()
+ unstashAll()
+ stashCount.set(0L)
+ } else {
+ // failure
+ log.error(s"onCompletion: $e", e)
+ failedCount.incrementAndGet()
+ replayTo ! m
+ }
+ }
+ })
+ } catch {
+ case e@(_: org.apache.kafka.clients.producer.BufferExhaustedException | _: Throwable) =>
+ log.error(s"$e", e)
+ log.info(s"stash")
+ stash()
+ stashCount.incrementAndGet()
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
new file mode 100644
index 0000000..b2cbd19
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
@@ -0,0 +1,379 @@
+package org.apache.s2graph.core
+
+import java.util
+import java.util.concurrent.ConcurrentHashMap
+
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.s2graph.core.mysqls.{Label, Model}
+import org.apache.s2graph.core.parsers.WhereParser
+import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
+import org.apache.s2graph.core.types.{InnerVal, LabelWithDirection}
+import org.apache.s2graph.core.utils.logger
+
+import scala.collection.JavaConversions._
+import scala.collection._
+import scala.collection.mutable.ListBuffer
+import scala.concurrent._
+import scala.util.Try
+
+object Graph {
+ val DefaultScore = 1.0
+
+ private val DefaultConfigs: Map[String, AnyRef] = Map(
+ "hbase.zookeeper.quorum" -> "localhost",
+ "hbase.table.name" -> "s2graph",
+ "hbase.table.compression.algorithm" -> "gz",
+ "phase" -> "dev",
+ "db.default.driver" -> "com.mysql.jdbc.Driver",
+ "db.default.url" -> "jdbc:mysql://localhost:3306/graph_dev",
+ "db.default.password" -> "graph",
+ "db.default.user" -> "graph",
+ "cache.max.size" -> java.lang.Integer.valueOf(10000),
+ "cache.ttl.seconds" -> java.lang.Integer.valueOf(60),
+ "hbase.client.retries.number" -> java.lang.Integer.valueOf(20),
+ "hbase.rpcs.buffered_flush_interval" -> java.lang.Short.valueOf(100.toShort),
+ "hbase.rpc.timeout" -> java.lang.Integer.valueOf(1000),
+ "max.retry.number" -> java.lang.Integer.valueOf(100),
+ "lock.expire.time" -> java.lang.Integer.valueOf(1000 * 60 * 10),
+ "max.back.off" -> java.lang.Integer.valueOf(100),
+ "hbase.fail.prob" -> java.lang.Double.valueOf(-0.1),
+ "delete.all.fetch.size" -> java.lang.Integer.valueOf(1000),
+ "future.cache.max.size" -> java.lang.Integer.valueOf(100000),
+ "future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
+ "future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
+ "s2graph.storage.backend" -> "hbase"
+ )
+
+ var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs)
+
+ /** helpers for filterEdges */
+ type HashKey = (Int, Int, Int, Int, Boolean)
+ type FilterHashKey = (Int, Int)
+ type Result = (ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]],
+ ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)],
+ ListBuffer[(HashKey, FilterHashKey, Edge, Double)])
+
+ def toHashKey(queryParam: QueryParam, edge: Edge, isDegree: Boolean): (HashKey, FilterHashKey) = {
+ val src = edge.srcVertex.innerId.hashCode()
+ val tgt = edge.tgtVertex.innerId.hashCode()
+ val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree)
+ val filterHashKey = (src, tgt)
+
+ (hashKey, filterHashKey)
+ }
+
+ def alreadyVisitedVertices(queryResultLs: Seq[QueryResult]): Map[(LabelWithDirection, Vertex), Boolean] = {
+ val vertices = for {
+ queryResult <- queryResultLs
+ edgeWithScore <- queryResult.edgeWithScoreLs
+ edge = edgeWithScore.edge
+ vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex
+ } yield (edge.labelWithDir, vertex) -> true
+
+ vertices.toMap
+ }
+
+ /** common methods for filter out, transform, aggregate queryResult */
+ def convertEdges(queryParam: QueryParam, edge: Edge, nextStepOpt: Option[Step]): Seq[Edge] = {
+ for {
+ convertedEdge <- queryParam.transformer.transform(edge, nextStepOpt) if !edge.isDegree
+ } yield convertedEdge
+ }
+
+ def processTimeDecay(queryParam: QueryParam, edge: Edge) = {
+ /** process time decay */
+ val tsVal = queryParam.timeDecay match {
+ case None => 1.0
+ case Some(timeDecay) =>
+ val tsVal = try {
+ val labelMeta = edge.label.metaPropsMap(timeDecay.labelMetaSeq)
+ val innerValWithTsOpt = edge.propsWithTs.get(timeDecay.labelMetaSeq)
+ innerValWithTsOpt.map { innerValWithTs =>
+ val innerVal = innerValWithTs.innerVal
+ labelMeta.dataType match {
+ case InnerVal.LONG => innerVal.value match {
+ case n: BigDecimal => n.bigDecimal.longValue()
+ case _ => innerVal.toString().toLong
+ }
+ case _ => innerVal.toString().toLong
+ }
+ } getOrElse(edge.ts)
+ } catch {
+ case e: Exception =>
+ logger.error(s"processTimeDecay error. ${edge.toLogString}", e)
+ edge.ts
+ }
+ val timeDiff = queryParam.timestamp - tsVal
+ timeDecay.decay(timeDiff)
+ }
+
+ tsVal
+ }
+
+ def aggregateScore(newScore: Double,
+ resultEdges: ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)],
+ duplicateEdges: ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]],
+ edgeWithScoreSorted: ListBuffer[(HashKey, FilterHashKey, Edge, Double)],
+ hashKey: HashKey,
+ filterHashKey: FilterHashKey,
+ queryParam: QueryParam,
+ convertedEdge: Edge) = {
+
+ /** skip duplicate policy check if consistencyLevel is strong */
+ if (queryParam.label.consistencyLevel != "strong" && resultEdges.containsKey(hashKey)) {
+ val (oldFilterHashKey, oldEdge, oldScore) = resultEdges.get(hashKey)
+ //TODO:
+ queryParam.duplicatePolicy match {
+ case Query.DuplicatePolicy.First => // do nothing
+ case Query.DuplicatePolicy.Raw =>
+ if (duplicateEdges.containsKey(hashKey)) {
+ duplicateEdges.get(hashKey).append(convertedEdge -> newScore)
+ } else {
+ val newBuffer = new ListBuffer[(Edge, Double)]
+ newBuffer.append(convertedEdge -> newScore)
+ duplicateEdges.put(hashKey, newBuffer)
+ }
+ case Query.DuplicatePolicy.CountSum =>
+ resultEdges.put(hashKey, (filterHashKey, oldEdge, oldScore + 1))
+ case _ =>
+ resultEdges.put(hashKey, (filterHashKey, oldEdge, oldScore + newScore))
+ }
+ } else {
+ resultEdges.put(hashKey, (filterHashKey, convertedEdge, newScore))
+ edgeWithScoreSorted.append((hashKey, filterHashKey, convertedEdge, newScore))
+ }
+ }
+
+ def aggregateResults(queryRequestWithResult: QueryRequestWithResult,
+ queryParamResult: Result,
+ edgesToInclude: util.HashSet[FilterHashKey],
+ edgesToExclude: util.HashSet[FilterHashKey]): QueryRequestWithResult = {
+ val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
+ val (query, stepIdx, _, queryParam) = QueryRequest.unapply(queryRequest).get
+
+ val (duplicateEdges, resultEdges, edgeWithScoreSorted) = queryParamResult
+ val edgesWithScores = for {
+ (hashKey, filterHashKey, edge, _) <- edgeWithScoreSorted if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
+ score = resultEdges.get(hashKey)._3
+ (duplicateEdge, aggregatedScore) <- fetchDuplicatedEdges(edge, score, hashKey, duplicateEdges) if aggregatedScore >= queryParam.threshold
+ } yield EdgeWithScore(duplicateEdge, aggregatedScore)
+
+ QueryRequestWithResult(queryRequest, QueryResult(edgesWithScores))
+ }
+
+ def fetchDuplicatedEdges(edge: Edge,
+ score: Double,
+ hashKey: HashKey,
+ duplicateEdges: ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]]) = {
+ (edge -> score) +: (if (duplicateEdges.containsKey(hashKey)) duplicateEdges.get(hashKey) else Seq.empty)
+ }
+
+ def queryResultWithFilter(queryRequestWithResult: QueryRequestWithResult) = {
+ val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
+ val (_, _, _, queryParam) = QueryRequest.unapply(queryRequest).get
+ val whereFilter = queryParam.where.get
+ if (whereFilter == WhereParser.success) queryResult.edgeWithScoreLs
+ else queryResult.edgeWithScoreLs.withFilter(edgeWithScore => whereFilter.filter(edgeWithScore.edge))
+ }
+
+ def filterEdges(queryResultLsFuture: Future[Seq[QueryRequestWithResult]],
+ alreadyVisited: Map[(LabelWithDirection, Vertex), Boolean] = Map.empty[(LabelWithDirection, Vertex), Boolean])
+ (implicit ec: scala.concurrent.ExecutionContext): Future[Seq[QueryRequestWithResult]] = {
+
+ queryResultLsFuture.map { queryRequestWithResultLs =>
+ if (queryRequestWithResultLs.isEmpty) Nil
+ else {
+ val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResultLs.head).get
+ val (q, stepIdx, srcVertex, queryParam) = QueryRequest.unapply(queryRequest).get
+ val step = q.steps(stepIdx)
+
+ val nextStepOpt = if (stepIdx < q.steps.size - 1) Option(q.steps(stepIdx + 1)) else None
+
+ val excludeLabelWithDirSet = new util.HashSet[(Int, Int)]
+ val includeLabelWithDirSet = new util.HashSet[(Int, Int)]
+ step.queryParams.filter(_.exclude).foreach(l => excludeLabelWithDirSet.add(l.labelWithDir.labelId -> l.labelWithDir.dir))
+ step.queryParams.filter(_.include).foreach(l => includeLabelWithDirSet.add(l.labelWithDir.labelId -> l.labelWithDir.dir))
+
+ val edgesToExclude = new util.HashSet[FilterHashKey]()
+ val edgesToInclude = new util.HashSet[FilterHashKey]()
+
+ val queryParamResultLs = new ListBuffer[Result]
+ queryRequestWithResultLs.foreach { queryRequestWithResult =>
+ val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
+ val queryParam = queryRequest.queryParam
+ val duplicateEdges = new util.concurrent.ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]]()
+ val resultEdges = new util.concurrent.ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)]()
+ val edgeWithScoreSorted = new ListBuffer[(HashKey, FilterHashKey, Edge, Double)]
+ val labelWeight = step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0)
+
+ // store degree value with Array.empty so if degree edge exist, it comes at very first.
+ def checkDegree() = queryResult.edgeWithScoreLs.headOption.exists { edgeWithScore =>
+ edgeWithScore.edge.isDegree
+ }
+ var isDegree = checkDegree()
+
+ val includeExcludeKey = queryParam.labelWithDir.labelId -> queryParam.labelWithDir.dir
+ val shouldBeExcluded = excludeLabelWithDirSet.contains(includeExcludeKey)
+ val shouldBeIncluded = includeLabelWithDirSet.contains(includeExcludeKey)
+
+ queryResultWithFilter(queryRequestWithResult).foreach { edgeWithScore =>
+ val (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+ if (queryParam.transformer.isDefault) {
+ val convertedEdge = edge
+
+ val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree)
+
+ /** check if this edge should be exlcuded. */
+ if (shouldBeExcluded && !isDegree) {
+ edgesToExclude.add(filterHashKey)
+ } else {
+ if (shouldBeIncluded && !isDegree) {
+ edgesToInclude.add(filterHashKey)
+ }
+ val tsVal = processTimeDecay(queryParam, convertedEdge)
+ val newScore = labelWeight * score * tsVal
+ aggregateScore(newScore, resultEdges, duplicateEdges, edgeWithScoreSorted, hashKey, filterHashKey, queryParam, convertedEdge)
+ }
+ } else {
+ convertEdges(queryParam, edge, nextStepOpt).foreach { convertedEdge =>
+ val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree)
+
+ /** check if this edge should be exlcuded. */
+ if (shouldBeExcluded && !isDegree) {
+ edgesToExclude.add(filterHashKey)
+ } else {
+ if (shouldBeIncluded && !isDegree) {
+ edgesToInclude.add(filterHashKey)
+ }
+ val tsVal = processTimeDecay(queryParam, convertedEdge)
+ val newScore = labelWeight * score * tsVal
+ aggregateScore(newScore, resultEdges, duplicateEdges, edgeWithScoreSorted, hashKey, filterHashKey, queryParam, convertedEdge)
+ }
+ }
+ }
+ isDegree = false
+ }
+ val ret = (duplicateEdges, resultEdges, edgeWithScoreSorted)
+ queryParamResultLs.append(ret)
+ }
+
+ val aggregatedResults = for {
+ (queryRequestWithResult, queryParamResult) <- queryRequestWithResultLs.zip(queryParamResultLs)
+ } yield {
+ aggregateResults(queryRequestWithResult, queryParamResult, edgesToInclude, edgesToExclude)
+ }
+
+ aggregatedResults
+ }
+ }
+ }
+
+ def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try {
+ val parts = GraphUtil.split(s)
+ val logType = parts(2)
+ val element = if (logType == "edge" | logType == "e") {
+ /** current only edge is considered to be bulk loaded */
+ labelMapping.get(parts(5)) match {
+ case None =>
+ case Some(toReplace) =>
+ parts(5) = toReplace
+ }
+ toEdge(parts)
+ } else if (logType == "vertex" | logType == "v") {
+ toVertex(parts)
+ } else {
+ throw new GraphExceptions.JsonParseException("log type is not exist in log.")
+ }
+
+ element
+ } recover {
+ case e: Exception =>
+ logger.error(s"$e", e)
+ None
+ } get
+
+
+ def toVertex(s: String): Option[Vertex] = {
+ toVertex(GraphUtil.split(s))
+ }
+
+ def toEdge(s: String): Option[Edge] = {
+ toEdge(GraphUtil.split(s))
+ }
+
+ //"1418342849000\tu\te\t3286249\t71770\ttalk_friend\t{\"is_hidden\":false}"
+ //{"from":1,"to":101,"label":"graph_test","props":{"time":-1, "weight":10},"timestamp":1417616431},
+ def toEdge(parts: Array[String]): Option[Edge] = Try {
+ val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
+ val props = if (parts.length >= 7) parts(6) else "{}"
+ val tempDirection = if (parts.length >= 8) parts(7) else "out"
+ val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection
+
+ val edge = Management.toEdge(ts.toLong, operation, srcId, tgtId, label, direction, props)
+ // logger.debug(s"toEdge: $edge")
+ Some(edge)
+ } recover {
+ case e: Exception =>
+ logger.error(s"toEdge: $e", e)
+ throw e
+ } get
+
+ //"1418342850000\ti\tv\t168756793\ttalk_user_id\t{\"country_iso\":\"KR\"}"
+ def toVertex(parts: Array[String]): Option[Vertex] = Try {
+ val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
+ val props = if (parts.length >= 7) parts(6) else "{}"
+ Some(Management.toVertex(ts.toLong, operation, srcId, serviceName, colName, props))
+ } recover {
+ case e: Throwable =>
+ logger.error(s"toVertex: $e", e)
+ throw e
+ } get
+
+ def initStorage(config: Config)(ec: ExecutionContext) = {
+ config.getString("s2graph.storage.backend") match {
+ case "hbase" => new AsynchbaseStorage(config)(ec)
+ case _ => throw new RuntimeException("not supported storage.")
+ }
+ }
+}
+
+class Graph(_config: Config)(implicit val ec: ExecutionContext) {
+ val config = _config.withFallback(Graph.DefaultConfig)
+
+ Model.apply(config)
+ Model.loadCache()
+
+ // TODO: Make storage client by config param
+ val storage = Graph.initStorage(config)(ec)
+
+
+ for {
+ entry <- config.entrySet() if Graph.DefaultConfigs.contains(entry.getKey)
+ (k, v) = (entry.getKey, entry.getValue)
+ } logger.info(s"[Initialized]: $k, ${this.config.getAnyRef(k)}")
+
+ /** select */
+ def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[Seq[QueryRequestWithResult]] = storage.checkEdges(params)
+
+ def getEdges(q: Query): Future[Seq[QueryRequestWithResult]] = storage.getEdges(q)
+
+ def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = storage.getVertices(vertices)
+
+ /** write */
+ def deleteAllAdjacentEdges(srcVertices: List[Vertex], labels: Seq[Label], dir: Int, ts: Long): Future[Boolean] =
+ storage.deleteAllAdjacentEdges(srcVertices, labels, dir, ts)
+
+ def mutateElements(elements: Seq[GraphElement], withWait: Boolean = false): Future[Seq[Boolean]] =
+ storage.mutateElements(elements, withWait)
+
+ def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait)
+
+ def mutateVertices(vertices: Seq[Vertex], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateVertices(vertices, withWait)
+
+ def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long)]] = storage.incrementCounts(edges, withWait)
+
+ def shutdown(): Unit = {
+ storage.flush()
+ Model.shutdown()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/GraphElement.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphElement.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphElement.scala
new file mode 100644
index 0000000..12bb941
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphElement.scala
@@ -0,0 +1,10 @@
+package org.apache.s2graph.core
+
+trait GraphElement {
+ def serviceName: String
+ def ts: Long
+ def isAsync: Boolean
+ def queueKey: String
+ def queuePartitionKey: String
+ def toLogString(): String
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
new file mode 100644
index 0000000..e7d2d76
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
@@ -0,0 +1,28 @@
+package org.apache.s2graph.core
+
+object GraphExceptions {
+
+ case class JsonParseException(msg: String) extends Exception(msg)
+
+ case class LabelNotExistException(msg: String) extends Exception(msg)
+
+ case class ModelNotFoundException(msg: String) extends Exception(msg)
+
+ case class MaxPropSizeReachedException(msg: String) extends Exception(msg)
+
+ case class LabelAlreadyExistException(msg: String) extends Exception(msg)
+
+ case class InternalException(msg: String) extends Exception(msg)
+
+ case class IllegalDataTypeException(msg: String) extends Exception(msg)
+
+ case class WhereParserException(msg: String, ex: Exception = null) extends Exception(msg, ex)
+
+ case class BadQueryException(msg: String, ex: Throwable = null) extends Exception(msg, ex)
+
+ case class InvalidHTableException(msg: String) extends Exception(msg)
+
+ case class FetchTimeoutException(msg: String) extends Exception(msg)
+
+ case class DropRequestException(msg: String) extends Exception(msg)
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala
new file mode 100644
index 0000000..1a2b916
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala
@@ -0,0 +1,139 @@
+package org.apache.s2graph.core
+
+import java.util.regex.Pattern
+
+import play.api.libs.json.Json
+
+import scala.util.hashing.MurmurHash3
+
+object GraphUtil {
+ private val TOKEN_DELIMITER = Pattern.compile("[\t]")
+ val operations = Map("i" -> 0, "insert" -> 0, "u" -> 1, "update" -> 1,
+ "increment" -> 2, "d" -> 3, "delete" -> 3,
+ "deleteAll" -> 4, "insertBulk" -> 5, "incrementCount" -> 6).map {
+ case (k, v) =>
+ k -> v.toByte
+ }
+ val BitsForMurMurHash = 16
+ val bytesForMurMurHash = 2
+ val defaultOpByte = operations("insert")
+ val directions = Map("out" -> 0, "in" -> 1, "undirected" -> 2, "u" -> 2, "directed" -> 0, "d" -> 0)
+ val consistencyLevel = Map("weak" -> 0, "strong" -> 1)
+
+ def toType(t: String) = {
+ t.trim().toLowerCase match {
+ case "e" | "edge" => "edge"
+ case "v" | "vertex" => "vertex"
+ }
+ }
+
+ def toDir(direction: String): Option[Byte] = {
+ val d = direction.trim().toLowerCase match {
+ case "directed" | "d" => Some(0)
+ case "undirected" | "u" => Some(2)
+ case "out" => Some(0)
+ case "in" => Some(1)
+ case _ => None
+ }
+ d.map(x => x.toByte)
+ }
+
+ def toDirection(direction: String): Int = {
+ direction.trim().toLowerCase match {
+ case "directed" | "d" => 0
+ case "undirected" | "u" => 2
+ case "out" => 0
+ case "in" => 1
+ case _ => 2
+ }
+ }
+
+ def fromDirection(direction: Int) = {
+ direction match {
+ case 0 => "out"
+ case 1 => "in"
+ case 2 => "undirected"
+ }
+ }
+
+ def toggleDir(dir: Int) = {
+ dir match {
+ case 0 => 1
+ case 1 => 0
+ case 2 => 2
+ case _ => throw new UnsupportedOperationException(s"toggleDirection: $dir")
+ }
+ }
+
+ def toOp(op: String): Option[Byte] = {
+ op.trim() match {
+ case "i" | "insert" => Some(0)
+ case "d" | "delete" => Some(3)
+ case "u" | "update" => Some(1)
+ case "increment" => Some(2)
+ case "deleteAll" => Some(4)
+ case "insertBulk" => Some(5)
+ case "incrementCount" => Option(6)
+ case _ => None
+ }
+ }
+
+ def fromOp(op: Byte): String = {
+ op match {
+ case 0 => "insert"
+ case 3 => "delete"
+ case 1 => "update"
+ case 2 => "increment"
+ case 4 => "deleteAll"
+ case 5 => "insertBulk"
+ case 6 => "incrementCount"
+ case _ =>
+ throw new UnsupportedOperationException(s"op : $op (only support 0(insert),1(delete),2(updaet),3(increment))")
+ }
+ }
+
+ // def toggleOp(op: Byte): Byte = {
+ // val ret = op match {
+ // case 0 => 1
+ // case 1 => 0
+ // case x: Byte => x
+ // }
+ // ret.toByte
+ // }
+ // 2^31 - 1
+
+ def transformHash(h: Int): Int = {
+ // h / 2 + (Int.MaxValue / 2 - 1)
+ if (h < 0) -1 * (h + 1) else h
+ }
+ def murmur3Int(s: String): Int = {
+ val hash = MurmurHash3.stringHash(s)
+ transformHash(hash)
+ }
+ def murmur3(s: String): Short = {
+ val hash = MurmurHash3.stringHash(s)
+ val positiveHash = transformHash(hash) >> BitsForMurMurHash
+ positiveHash.toShort
+// Random.nextInt(Short.MaxValue).toShort
+ }
+
+ def smartSplit(s: String, delemiter: String) = {
+ val trimed_string = s.trim()
+ if (trimed_string.equals("")) {
+ Seq[String]()
+ } else {
+ trimed_string.split(delemiter).toList
+ }
+ }
+
+ def split(line: String) = TOKEN_DELIMITER.split(line)
+
+ def parseString(s: String): List[String] = {
+ if (!s.startsWith("[")) {
+ s.split("\n").toList
+ } else {
+ Json.parse(s).asOpt[List[String]].getOrElse(List.empty[String])
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
new file mode 100644
index 0000000..8b9a228
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
@@ -0,0 +1,134 @@
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.types.{InnerVal, InnerValLike}
+import org.apache.s2graph.core.utils.logger
+import play.api.libs.json._
+
+
+trait JSONParser {
+
+ //TODO: check result notation on bigDecimal.
+ def innerValToJsValue(innerVal: InnerValLike, dataType: String): Option[JsValue] = {
+ try {
+ val dType = InnerVal.toInnerDataType(dataType)
+ val jsValue = dType match {
+ case InnerVal.STRING => JsString(innerVal.value.asInstanceOf[String])
+ case InnerVal.BOOLEAN => JsBoolean(innerVal.value.asInstanceOf[Boolean])
+ case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | InnerVal.FLOAT | InnerVal.DOUBLE =>
+ // case t if InnerVal.NUMERICS.contains(t) =>
+ innerVal.value match {
+ case l: Long => JsNumber(l)
+ case i: Int => JsNumber(i)
+ case s: Short => JsNumber(s.toLong)
+ case b: Byte => JsNumber(b.toLong)
+ case f: Float => JsNumber(f.toDouble)
+ case d: Double =>
+ // JsNumber(d)
+ dType match {
+ case InnerVal.BYTE => JsNumber(d.toInt)
+ case InnerVal.SHORT => JsNumber(d.toInt)
+ case InnerVal.INT => JsNumber(d.toInt)
+ case InnerVal.LONG => JsNumber(d.toLong)
+ case InnerVal.FLOAT => JsNumber(d.toDouble)
+ case InnerVal.DOUBLE => JsNumber(d.toDouble)
+ case _ => throw new RuntimeException(s"$innerVal, $dType => $dataType")
+ }
+ case num: BigDecimal =>
+ // JsNumber(num)
+ // JsNumber(InnerVal.scaleNumber(num.asInstanceOf[BigDecimal], dType))
+ dType match {
+ case InnerVal.BYTE => JsNumber(num.toInt)
+ case InnerVal.SHORT => JsNumber(num.toInt)
+ case InnerVal.INT => JsNumber(num.toInt)
+ case InnerVal.LONG => JsNumber(num.toLong)
+ case InnerVal.FLOAT => JsNumber(num.toDouble)
+ case InnerVal.DOUBLE => JsNumber(num.toDouble)
+ case _ => throw new RuntimeException(s"$innerVal, $dType => $dataType")
+ }
+ // JsNumber(num.toLong)
+ case _ => throw new RuntimeException(s"$innerVal, Numeric Unknown => $dataType")
+ }
+ // JsNumber(InnerVal.scaleNumber(innerVal.asInstanceOf[BigDecimal], dType))
+ case _ => throw new RuntimeException(s"$innerVal, Unknown => $dataType")
+ }
+ Some(jsValue)
+ } catch {
+ case e: Exception =>
+ logger.info(s"JSONParser.innerValToJsValue: $e")
+ None
+ }
+ }
+
+ // def innerValToString(innerVal: InnerValLike, dataType: String): String = {
+ // val dType = InnerVal.toInnerDataType(dataType)
+ // InnerVal.toInnerDataType(dType) match {
+ // case InnerVal.STRING => innerVal.toString
+ // case InnerVal.BOOLEAN => innerVal.toString
+ // // case t if InnerVal.NUMERICS.contains(t) =>
+ // case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | InnerVal.FLOAT | InnerVal.DOUBLE =>
+ // BigDecimal(innerVal.toString).bigDecimal.toPlainString
+ // case _ => innerVal.toString
+ // // throw new RuntimeException("innerVal to jsValue failed.")
+ // }
+ // }
+
+ def toInnerVal(str: String, dataType: String, version: String): InnerValLike = {
+ //TODO:
+ // logger.error(s"toInnerVal: $str, $dataType, $version")
+ val s =
+ if (str.startsWith("\"") && str.endsWith("\"")) str.substring(1, str.length - 1)
+ else str
+ val dType = InnerVal.toInnerDataType(dataType)
+
+ dType match {
+ case InnerVal.STRING => InnerVal.withStr(s, version)
+ // case t if InnerVal.NUMERICS.contains(t) => InnerVal.withNumber(BigDecimal(s), version)
+ case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | InnerVal.FLOAT | InnerVal.DOUBLE =>
+ InnerVal.withNumber(BigDecimal(s), version)
+ case InnerVal.BOOLEAN => InnerVal.withBoolean(s.toBoolean, version)
+ case InnerVal.BLOB => InnerVal.withBlob(s.getBytes, version)
+ case _ =>
+ // InnerVal.withStr("")
+ throw new RuntimeException(s"illegal datatype for string: dataType is $dataType for $s")
+ }
+ }
+
+ def jsValueToInnerVal(jsValue: JsValue, dataType: String, version: String): Option[InnerValLike] = {
+ val ret = try {
+ val dType = InnerVal.toInnerDataType(dataType.toLowerCase())
+ jsValue match {
+ case n: JsNumber =>
+ dType match {
+ case InnerVal.STRING => Some(InnerVal.withStr(jsValue.toString, version))
+ // case t if InnerVal.NUMERICS.contains(t) =>
+ case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | InnerVal.FLOAT | InnerVal.DOUBLE =>
+ Some(InnerVal.withNumber(n.value, version))
+ case _ => None
+ }
+ case s: JsString =>
+ dType match {
+ case InnerVal.STRING => Some(InnerVal.withStr(s.value, version))
+ case InnerVal.BOOLEAN => Some(InnerVal.withBoolean(s.as[String].toBoolean, version))
+ // case t if InnerVal.NUMERICS.contains(t) =>
+ case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | InnerVal.FLOAT | InnerVal.DOUBLE =>
+ Some(InnerVal.withNumber(BigDecimal(s.value), version))
+ case _ => None
+ }
+ case b: JsBoolean =>
+ dType match {
+ case InnerVal.STRING => Some(InnerVal.withStr(b.toString, version))
+ case InnerVal.BOOLEAN => Some(InnerVal.withBoolean(b.value, version))
+ case _ => None
+ }
+ case _ =>
+ None
+ }
+ } catch {
+ case e: Exception =>
+ logger.error(e.getMessage)
+ None
+ }
+
+ ret
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
new file mode 100644
index 0000000..b355757
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -0,0 +1,368 @@
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.GraphExceptions.{InvalidHTableException, LabelAlreadyExistException, LabelNotExistException}
+import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
+import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.types.HBaseType._
+import org.apache.s2graph.core.types._
+import play.api.libs.json.Reads._
+import play.api.libs.json._
+
+import scala.util.Try
+
+/**
+ * This is designed to be bridge between rest to s2core.
+ * s2core never use this for finding models.
+ */
+object Management extends JSONParser {
+
+ object JsonModel {
+
+ case class Prop(name: String, defaultValue: String, datatType: String)
+
+ object Prop extends ((String, String, String) => Prop)
+
+ case class Index(name: String, propNames: Seq[String])
+
+ }
+
+ import HBaseType._
+
+ val DefaultCompressionAlgorithm = "gz"
+
+
+ def findService(serviceName: String) = {
+ Service.findByName(serviceName, useCache = false)
+ }
+
+ def deleteService(serviceName: String) = {
+ Service.findByName(serviceName).foreach { service =>
+ // service.deleteAll()
+ }
+ }
+
+ def updateHTable(labelName: String, newHTableName: String): Try[Int] = Try {
+ val targetLabel = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(s"Target label $labelName does not exist."))
+ if (targetLabel.hTableName == newHTableName) throw new InvalidHTableException(s"New HTable name is already in use for target label.")
+
+ Label.updateHTableName(targetLabel.label, newHTableName)
+ }
+
+
+
+ def createServiceColumn(serviceName: String,
+ columnName: String,
+ columnType: String,
+ props: Seq[Prop],
+ schemaVersion: String = DEFAULT_VERSION) = {
+
+ Model withTx { implicit session =>
+ val serviceOpt = Service.findByName(serviceName)
+ serviceOpt match {
+ case None => throw new RuntimeException(s"create service $serviceName has not been created.")
+ case Some(service) =>
+ val serviceColumn = ServiceColumn.findOrInsert(service.id.get, columnName, Some(columnType), schemaVersion)
+ for {
+ Prop(propName, defaultValue, dataType) <- props
+ } yield {
+ ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType)
+ }
+ }
+ }
+ }
+
+ def deleteColumn(serviceName: String, columnName: String, schemaVersion: String = DEFAULT_VERSION) = {
+ Model withTx { implicit session =>
+ val service = Service.findByName(serviceName, useCache = false).getOrElse(throw new RuntimeException("Service not Found"))
+ val serviceColumns = ServiceColumn.find(service.id.get, columnName, useCache = false)
+ val columnNames = serviceColumns.map { serviceColumn =>
+ ServiceColumn.delete(serviceColumn.id.get)
+ serviceColumn.columnName
+ }
+
+ columnNames.getOrElse(throw new RuntimeException("column not found"))
+ }
+ }
+
+ def findLabel(labelName: String): Option[Label] = {
+ Label.findByName(labelName, useCache = false)
+ }
+
+ def deleteLabel(labelName: String) = {
+ Model withTx { implicit session =>
+ Label.findByName(labelName, useCache = false).foreach { label =>
+ Label.deleteAll(label)
+ }
+ labelName
+ }
+ }
+
+ def addIndex(labelStr: String, indices: Seq[Index]): Try[Label] = {
+ Model withTx { implicit session =>
+ val label = Label.findByName(labelStr).getOrElse(throw LabelNotExistException(s"$labelStr not found"))
+ val labelMetaMap = label.metaPropsInvMap
+
+ indices.foreach { index =>
+ val metaSeq = index.propNames.map { name => labelMetaMap(name).seq }
+ LabelIndex.findOrInsert(label.id.get, index.name, metaSeq.toList, "none")
+ }
+
+ label
+ }
+ }
+
+ def addProp(labelStr: String, prop: Prop) = {
+ Model withTx { implicit session =>
+ val labelOpt = Label.findByName(labelStr)
+ val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr not found"))
+
+ LabelMeta.findOrInsert(label.id.get, prop.name, prop.defaultValue, prop.datatType)
+ }
+ }
+
+ def addProps(labelStr: String, props: Seq[Prop]) = {
+ Model withTx { implicit session =>
+ val labelOpt = Label.findByName(labelStr)
+ val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr not found"))
+
+ props.map {
+ case Prop(propName, defaultValue, dataType) =>
+ LabelMeta.findOrInsert(label.id.get, propName, defaultValue, dataType)
+ }
+ }
+ }
+
+ def addVertexProp(serviceName: String,
+ columnName: String,
+ propsName: String,
+ propsType: String,
+ schemaVersion: String = DEFAULT_VERSION): ColumnMeta = {
+ val result = for {
+ service <- Service.findByName(serviceName, useCache = false)
+ serviceColumn <- ServiceColumn.find(service.id.get, columnName)
+ } yield {
+ ColumnMeta.findOrInsert(serviceColumn.id.get, propsName, propsType)
+ }
+ result.getOrElse({
+ throw new RuntimeException(s"add property on vertex failed")
+ })
+ }
+
+ def getServiceLable(label: String): Option[Label] = {
+ Label.findByName(label, useCache = true)
+ }
+
+ /**
+ *
+ */
+
+ def toLabelWithDirectionAndOp(label: Label, direction: String): Option[LabelWithDirection] = {
+ for {
+ labelId <- label.id
+ dir = GraphUtil.toDirection(direction)
+ } yield LabelWithDirection(labelId, dir)
+ }
+
+ def tryOption[A, R](key: A, f: A => Option[R]) = {
+ f(key) match {
+ case None => throw new GraphExceptions.InternalException(s"$key is not found in DB. create $key first.")
+ case Some(r) => r
+ }
+ }
+
+ def toEdge(ts: Long, operation: String, srcId: String, tgtId: String,
+ labelStr: String, direction: String = "", props: String): Edge = {
+
+ val label = tryOption(labelStr, getServiceLable)
+ val dir =
+ if (direction == "")
+// GraphUtil.toDirection(label.direction)
+ GraphUtil.directions("out")
+ else
+ GraphUtil.toDirection(direction)
+
+ // logger.debug(s"$srcId, ${label.srcColumnWithDir(dir)}")
+ // logger.debug(s"$tgtId, ${label.tgtColumnWithDir(dir)}")
+
+ val srcVertexId = toInnerVal(srcId, label.srcColumn.columnType, label.schemaVersion)
+ val tgtVertexId = toInnerVal(tgtId, label.tgtColumn.columnType, label.schemaVersion)
+
+ val srcColId = label.srcColumn.id.get
+ val tgtColId = label.tgtColumn.id.get
+ val (srcVertex, tgtVertex) = if (dir == GraphUtil.directions("out")) {
+ (Vertex(SourceVertexId(srcColId, srcVertexId), System.currentTimeMillis()),
+ Vertex(TargetVertexId(tgtColId, tgtVertexId), System.currentTimeMillis()))
+ } else {
+ (Vertex(SourceVertexId(tgtColId, tgtVertexId), System.currentTimeMillis()),
+ Vertex(TargetVertexId(srcColId, srcVertexId), System.currentTimeMillis()))
+ }
+
+ // val dir = if (direction == "") GraphUtil.toDirection(label.direction) else GraphUtil.toDirection(direction)
+ val labelWithDir = LabelWithDirection(label.id.get, dir)
+ val op = tryOption(operation, GraphUtil.toOp)
+
+ val jsObject = Json.parse(props).asOpt[JsObject].getOrElse(Json.obj())
+ val parsedProps = toProps(label, jsObject.fields).toMap
+ val propsWithTs = parsedProps.map(kv => (kv._1 -> InnerValLikeWithTs(kv._2, ts))) ++
+ Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(ts, label.schemaVersion), ts))
+
+ Edge(srcVertex, tgtVertex, labelWithDir, op, version = ts, propsWithTs = propsWithTs)
+
+ }
+
+ def toVertex(ts: Long, operation: String, id: String, serviceName: String, columnName: String, props: String): Vertex = {
+ Service.findByName(serviceName) match {
+ case None => throw new RuntimeException(s"$serviceName does not exist. create service first.")
+ case Some(service) =>
+ ServiceColumn.find(service.id.get, columnName) match {
+ case None => throw new RuntimeException(s"$columnName is not exist. create service column first.")
+ case Some(col) =>
+ val idVal = toInnerVal(id, col.columnType, col.schemaVersion)
+ val op = tryOption(operation, GraphUtil.toOp)
+ val jsObject = Json.parse(props).asOpt[JsObject].getOrElse(Json.obj())
+ val parsedProps = toProps(col, jsObject).toMap
+ Vertex(VertexId(col.id.get, idVal), ts, parsedProps, op = op)
+ }
+ }
+ }
+
+ def toProps(column: ServiceColumn, js: JsObject): Seq[(Int, InnerValLike)] = {
+
+ val props = for {
+ (k, v) <- js.fields
+ meta <- column.metasInvMap.get(k)
+ } yield {
+ val innerVal = jsValueToInnerVal(v, meta.dataType, column.schemaVersion).getOrElse(
+ throw new RuntimeException(s"$k is not defined. create schema for vertex."))
+
+ (meta.seq.toInt, innerVal)
+ }
+ props
+
+ }
+
+ def toProps(label: Label, js: Seq[(String, JsValue)]): Seq[(Byte, InnerValLike)] = {
+ val props = for {
+ (k, v) <- js
+ meta <- label.metaPropsInvMap.get(k)
+ innerVal <- jsValueToInnerVal(v, meta.dataType, label.schemaVersion)
+ } yield (meta.seq, innerVal)
+
+ props
+ }
+
+
+ /**
+ * update label name.
+ */
+ def updateLabelName(oldLabelName: String, newLabelName: String) = {
+ for {
+ old <- Label.findByName(oldLabelName)
+ } {
+ Label.findByName(newLabelName) match {
+ case None =>
+ Label.updateName(oldLabelName, newLabelName)
+ case Some(_) =>
+ // throw new RuntimeException(s"$newLabelName already exist")
+ }
+ }
+ }
+}
+
+class Management(graph: Graph) {
+ import Management._
+ val storage = graph.storage
+
+ def createTable(zkAddr: String,
+ tableName: String,
+ cfs: List[String],
+ regionMultiplier: Int,
+ ttl: Option[Int],
+ compressionAlgorithm: String = DefaultCompressionAlgorithm): Unit =
+ storage.createTable(zkAddr, tableName, cfs, regionMultiplier, ttl, compressionAlgorithm)
+
+ /** HBase specific code */
+ def createService(serviceName: String,
+ cluster: String, hTableName: String,
+ preSplitSize: Int, hTableTTL: Option[Int],
+ compressionAlgorithm: String = DefaultCompressionAlgorithm): Try[Service] = {
+
+ Model withTx { implicit session =>
+ val service = Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm)
+ /** create hbase table for service */
+ storage.createTable(cluster, hTableName, List("e", "v"), preSplitSize, hTableTTL, compressionAlgorithm)
+ service
+ }
+ }
+
+ /** HBase specific code */
+ def createLabel(label: String,
+ srcServiceName: String,
+ srcColumnName: String,
+ srcColumnType: String,
+ tgtServiceName: String,
+ tgtColumnName: String,
+ tgtColumnType: String,
+ isDirected: Boolean = true,
+ serviceName: String,
+ indices: Seq[Index],
+ props: Seq[Prop],
+ consistencyLevel: String,
+ hTableName: Option[String],
+ hTableTTL: Option[Int],
+ schemaVersion: String = DEFAULT_VERSION,
+ isAsync: Boolean,
+ compressionAlgorithm: String = "gz"): Try[Label] = {
+
+ val labelOpt = Label.findByName(label, useCache = false)
+
+ Model withTx { implicit session =>
+ labelOpt match {
+ case Some(l) =>
+ throw new GraphExceptions.LabelAlreadyExistException(s"Label name ${l.label} already exist.")
+ case None =>
+ /** create all models */
+ val newLabel = Label.insertAll(label,
+ srcServiceName, srcColumnName, srcColumnType,
+ tgtServiceName, tgtColumnName, tgtColumnType,
+ isDirected, serviceName, indices, props, consistencyLevel,
+ hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm)
+
+ /** create hbase table */
+ val service = newLabel.service
+ (hTableName, hTableTTL) match {
+ case (None, None) => // do nothing
+ case (None, Some(hbaseTableTTL)) => throw new RuntimeException("if want to specify ttl, give hbaseTableName also")
+ case (Some(hbaseTableName), None) =>
+ // create own hbase table with default ttl on service level.
+ storage.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, service.hTableTTL, compressionAlgorithm)
+ case (Some(hbaseTableName), Some(hbaseTableTTL)) =>
+ // create own hbase table with own ttl.
+ storage.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, hTableTTL, compressionAlgorithm)
+ }
+ newLabel
+ }
+ }
+ }
+
+ /**
+ * label
+ */
+ /**
+ * copy label when if oldLabel exist and newLabel do not exist.
+ * copy label: only used by bulk load job. not sure if we need to parameterize hbase cluster.
+ */
+ def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]) = {
+ val old = Label.findByName(oldLabelName).getOrElse(throw new LabelAlreadyExistException(s"Old label $oldLabelName not exists."))
+ if (Label.findByName(newLabelName).isDefined) throw new LabelAlreadyExistException(s"New label $newLabelName already exists.")
+
+ val allProps = old.metas.map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) }
+ val allIndices = old.indices.map { index => Index(index.name, index.propNames) }
+
+ createLabel(newLabelName, old.srcService.serviceName, old.srcColumnName, old.srcColumnType,
+ old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType,
+ old.isDirected, old.serviceName,
+ allIndices, allProps,
+ old.consistencyLevel, hTableName, old.hTableTTL, old.schemaVersion, old.isAsync, old.compressionAlgorithm)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala b/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala
new file mode 100644
index 0000000..c31aa79
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala
@@ -0,0 +1,146 @@
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.types.InnerValLike
+import play.api.libs.json.{JsNumber, JsString, JsValue}
+
+object OrderingUtil {
+
+ implicit object JsValueOrdering extends Ordering[JsValue] {
+ override def compare(x: JsValue, y: JsValue): Int = {
+ (x, y) match {
+ case (JsNumber(xv), JsNumber(yv)) =>
+ Ordering.BigDecimal.compare(xv, yv)
+ case (JsString(xv), JsString(yv)) =>
+ Ordering.String.compare(xv, yv)
+ case _ => throw new Exception(s"unsupported type")
+ }
+ }
+ }
+
+ implicit object InnerValLikeOrdering extends Ordering[InnerValLike] {
+ override def compare(x: InnerValLike, y: InnerValLike): Int = {
+ x.compare(y)
+ }
+ }
+
+ implicit object MultiValueOrdering extends Ordering[Any] {
+ override def compare(x: Any, y: Any): Int = {
+ (x, y) match {
+ case (xv: Int, yv: Int) => implicitly[Ordering[Int]].compare(xv, yv)
+ case (xv: Long, yv: Long) => implicitly[Ordering[Long]].compare(xv, yv)
+ case (xv: Double, yv: Double) => implicitly[Ordering[Double]].compare(xv, yv)
+ case (xv: String, yv: String) => implicitly[Ordering[String]].compare(xv, yv)
+ case (xv: JsValue, yv: JsValue) => implicitly[Ordering[JsValue]].compare(xv, yv)
+ case (xv: InnerValLike, yv: InnerValLike) => implicitly[Ordering[InnerValLike]].compare(xv, yv)
+ }
+ }
+ }
+
+ def TupleMultiOrdering[T](ascendingLs: Seq[Boolean])(implicit ord: Ordering[T]): Ordering[(T, T, T, T)] = {
+ new Ordering[(T, T, T, T)] {
+ override def compare(tx: (T, T, T, T), ty: (T, T, T, T)): Int = {
+ val len = ascendingLs.length
+ val it = ascendingLs.iterator
+ if (len >= 1) {
+ val (x, y) = it.next() match {
+ case true => tx -> ty
+ case false => ty -> tx
+ }
+ val compare1 = ord.compare(x._1, y._1)
+ if (compare1 != 0) return compare1
+ }
+
+ if (len >= 2) {
+ val (x, y) = it.next() match {
+ case true => tx -> ty
+ case false => ty -> tx
+ }
+ val compare2 = ord.compare(x._2, y._2)
+ if (compare2 != 0) return compare2
+ }
+
+ if (len >= 3) {
+ val (x, y) = it.next() match {
+ case true => tx -> ty
+ case false => ty -> tx
+ }
+ val compare3 = ord.compare(x._3, y._3)
+ if (compare3 != 0) return compare3
+ }
+
+ if (len >= 4) {
+ val (x, y) = it.next() match {
+ case true => tx -> ty
+ case false => ty -> tx
+ }
+ val compare4 = ord.compare(x._4, y._4)
+ if (compare4 != 0) return compare4
+ }
+ 0
+ }
+ }
+ }
+}
+
+class SeqMultiOrdering[T](ascendingLs: Seq[Boolean], defaultAscending: Boolean = true)(implicit ord: Ordering[T]) extends Ordering[Seq[T]] {
+ override def compare(x: Seq[T], y: Seq[T]): Int = {
+ val xe = x.iterator
+ val ye = y.iterator
+ val oe = ascendingLs.iterator
+
+ while (xe.hasNext && ye.hasNext) {
+ val ascending = if (oe.hasNext) oe.next() else defaultAscending
+ val (xev, yev) = ascending match {
+ case true => xe.next() -> ye.next()
+ case false => ye.next() -> xe.next()
+ }
+ val res = ord.compare(xev, yev)
+ if (res != 0) return res
+ }
+
+ Ordering.Boolean.compare(xe.hasNext, ye.hasNext)
+ }
+}
+
+//class TupleMultiOrdering[T](ascendingLs: Seq[Boolean])(implicit ord: Ordering[T]) extends Ordering[(T, T, T, T)] {
+// override def compare(tx: (T, T, T, T), ty: (T, T, T, T)): Int = {
+// val len = ascendingLs.length
+// val it = ascendingLs.iterator
+// if (len >= 1) {
+// val (x, y) = it.next() match {
+// case true => tx -> ty
+// case false => ty -> tx
+// }
+// val compare1 = ord.compare(x._1, y._1)
+// if (compare1 != 0) return compare1
+// }
+//
+// if (len >= 2) {
+// val (x, y) = it.next() match {
+// case true => tx -> ty
+// case false => ty -> tx
+// }
+// val compare2 = ord.compare(x._2, y._2)
+// if (compare2 != 0) return compare2
+// }
+//
+// if (len >= 3) {
+// val (x, y) = it.next() match {
+// case true => tx -> ty
+// case false => ty -> tx
+// }
+// val compare3 = ord.compare(x._3, y._3)
+// if (compare3 != 0) return compare3
+// }
+//
+// if (len >= 4) {
+// val (x, y) = it.next() match {
+// case true => tx -> ty
+// case false => ty -> tx
+// }
+// val compare4 = ord.compare(x._4, y._4)
+// if (compare4 != 0) return compare4
+// }
+// 0
+// }
+//}