You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/11/30 12:07:18 UTC
[2/7] incubator-s2graph git commit: [S2GRAPH-130]: Edge.propsWithTs
data type should be changed into mutable to support setter interface exist in
tp3. - Make Vertex/Edge/Graph to implement Tinkerpop3. - Change data type of
Edge's propsWithTs to java.ut
[S2GRAPH-130]: Edge.propsWithTs data type should be changed into mutable to support setter interface exist in tp3.
- Make Vertex/Edge/Graph to implement Tinkerpop3.
- Change data type of Edge's propsWithTs to java.util.Map[String, S2Property[_]].
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/6356573e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/6356573e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/6356573e
Branch: refs/heads/master
Commit: 6356573e6a658dbfeb240bdee642d055991e5ac2
Parents: 292174e
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu Nov 24 12:14:08 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Nov 24 12:14:08 2016 +0900
----------------------------------------------------------------------
.../loader/subscriber/GraphSubscriber.scala | 2 +-
.../loader/subscriber/TransferToHFile.scala | 4 +-
.../s2graph/loader/subscriber/WalLogStat.scala | 2 +-
.../loader/subscriber/WalLogToHDFS.scala | 2 +-
.../scala/org/apache/s2graph/core/Edge.scala | 442 +++++++++++++-----
.../scala/org/apache/s2graph/core/Graph.scala | 246 +++++++---
.../org/apache/s2graph/core/QueryParam.scala | 4 +-
.../org/apache/s2graph/core/QueryResult.scala | 2 +-
.../org/apache/s2graph/core/S2Property.scala | 33 +-
.../scala/org/apache/s2graph/core/Vertex.scala | 24 +-
.../s2graph/core/rest/RequestParser.scala | 8 +-
.../apache/s2graph/core/storage/Storage.scala | 58 ++-
.../tall/IndexEdgeDeserializable.scala | 80 ++--
.../wide/IndexEdgeDeserializable.scala | 78 ++--
.../tall/SnapshotEdgeDeserializable.scala | 8 +-
.../wide/SnapshotEdgeDeserializable.scala | 8 +-
.../org/apache/s2graph/core/EdgeTest.scala | 457 +++----------------
.../core/Integrate/IntegrateCommon.scala | 2 +
.../core/Integrate/WeakLabelDeleteTest.scala | 5 +-
.../s2graph/core/parsers/WhereParserTest.scala | 31 +-
.../core/storage/hbase/IndexEdgeTest.scala | 206 ++++-----
.../loader/core/CounterEtlFunctions.scala | 5 +-
22 files changed, 850 insertions(+), 857 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
index 05aed34..b25bc84 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
@@ -106,7 +106,7 @@ object GraphSubscriberHelper extends WithKafka {
(statFunc: (String, Int) => Unit): Iterable[GraphElement] = {
(for (msg <- msgs) yield {
statFunc("total", 1)
- Graph.toGraphElement(msg, labelMapping) match {
+ g.toGraphElement(msg, labelMapping) match {
case Some(e) if e.isInstanceOf[Edge] =>
statFunc("EdgeParseOk", 1)
e.asInstanceOf[Edge]
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
index 9ebff03..3345d56 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
@@ -101,7 +101,7 @@ object TransferToHFile extends SparkApp {
val ts = System.currentTimeMillis()
val propsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion))
- val edge = Edge(vertex, vertex, label, dir, propsWithTs=propsWithTs)
+ val edge = GraphSubscriberHelper.g.newEdge(vertex, vertex, label, dir, propsWithTs=propsWithTs)
edge.edgesWithIndex.flatMap { indexEdge =>
GraphSubscriberHelper.g.getStorage(indexEdge.label).indexEdgeSerializer(indexEdge).toKeyValues.map { kv =>
@@ -125,7 +125,7 @@ object TransferToHFile extends SparkApp {
def toKeyValues(strs: Seq[String], labelMapping: Map[String, String], autoEdgeCreate: Boolean): Iterator[KeyValue] = {
val kvs = for {
s <- strs
- element <- Graph.toGraphElement(s, labelMapping).toSeq if element.isInstanceOf[Edge]
+ element <- GraphSubscriberHelper.g.toGraphElement(s, labelMapping).toSeq if element.isInstanceOf[Edge]
edge = element.asInstanceOf[Edge]
putRequest <- insertBulkForLoaderAsync(edge, autoEdgeCreate)
} yield {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
index d47e648..5b68754 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
@@ -69,7 +69,7 @@ object WalLogStat extends SparkApp with WithKafka {
val phase = System.getProperty("phase")
GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList)
partition.map { case (key, msg) =>
- Graph.toGraphElement(msg) match {
+ GraphSubscriberHelper.g.toGraphElement(msg) match {
case Some(elem) =>
val serviceName = elem.serviceName
msg.split("\t", 7) match {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
index a8fc4df..0f69dc7 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
@@ -92,7 +92,7 @@ object WalLogToHDFS extends SparkApp with WithKafka {
GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList)
partition.flatMap { case (key, msg) =>
- val optMsg = Graph.toGraphElement(msg).flatMap { element =>
+ val optMsg = GraphSubscriberHelper.g.toGraphElement(msg).flatMap { element =>
val arr = msg.split("\t", 7)
val service = element.serviceName
val label = arr(5)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
index b27a05e..87f9cd7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
@@ -20,50 +20,57 @@
package org.apache.s2graph.core
import java.util
+import java.util.function.BiConsumer
+import org.apache.s2graph.core.Edge.{Props, State}
import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.utils.logger
import org.apache.tinkerpop.gremlin.structure
+import org.apache.tinkerpop.gremlin.structure.{Direction, Edge => TpEdge, Graph => TpGraph, Property}
import play.api.libs.json.{JsNumber, JsObject, Json}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{Map => MutableMap}
import scala.util.hashing.MurmurHash3
-import org.apache.tinkerpop.gremlin.structure.{Edge => TpEdge, Direction, Property, Graph => TpGraph}
-case class SnapshotEdge(srcVertex: Vertex,
+case class SnapshotEdge(graph: Graph,
+ srcVertex: Vertex,
tgtVertex: Vertex,
label: Label,
- direction: Int,
+ dir: Int,
op: Byte,
version: Long,
- private val propsWithTs: Map[LabelMeta, InnerValLikeWithTs],
+ private val propsWithTs: Props,
pendingEdgeOpt: Option[Edge],
statusCode: Byte = 0,
lockTs: Option[Long],
tsInnerValOpt: Option[InnerValLike] = None) {
-
- lazy val labelWithDir = LabelWithDirection(label.id.get, direction)
- if (!propsWithTs.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.")
+ lazy val direction = GraphUtil.fromDirection(dir)
+ lazy val operation = GraphUtil.fromOp(op)
+ lazy val edge = toEdge
+ lazy val labelWithDir = LabelWithDirection(label.id.get, dir)
+// if (!propsWithTs.contains(LabelMeta.timestamp.name)) throw new Exception("Timestamp is required.")
// val label = Label.findById(labelWithDir.labelId)
lazy val schemaVer = label.schemaVersion
- lazy val propsWithoutTs = propsWithTs.mapValues(_.innerVal)
- lazy val ts = propsWithTs(LabelMeta.timestamp).innerVal.toString().toLong
+ lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.toString().toLong
- def propsToKeyValuesWithTs = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.map(kv => kv._1.seq -> kv._2).toSeq)
+ def propsToKeyValuesWithTs = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq)
def allPropsDeleted = Edge.allPropsDeleted(propsWithTs)
def toEdge: Edge = {
- val ts = propsWithTs.get(LabelMeta.timestamp).map(v => v.ts).getOrElse(version)
- Edge(srcVertex, tgtVertex, label, direction, op,
+ Edge(graph, srcVertex, tgtVertex, label, dir, op,
version, propsWithTs, pendingEdgeOpt = pendingEdgeOpt,
statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt)
}
def propsWithName = (for {
- (meta, v) <- propsWithTs
+ (_, v) <- propsWithTs.asScala
+ meta = v.labelMeta
jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
} yield meta.name -> jsValue) ++ Map("version" -> JsNumber(version))
@@ -71,26 +78,55 @@ case class SnapshotEdge(srcVertex: Vertex,
def toLogString() = {
List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, propsWithName).mkString("\t")
}
+
+ def property[V](key: String, value: V, ts: Long): S2Property[V] = {
+ val labelMeta = label.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on IndexEdge."))
+ val newProps = new S2Property(edge, labelMeta, key, value, ts)
+ propsWithTs.put(key, newProps)
+ newProps
+ }
+ override def hashCode(): Int = {
+ MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId)
+ }
+
+ override def equals(other: Any): Boolean = other match {
+ case e: SnapshotEdge =>
+ srcVertex.innerId == e.srcVertex.innerId &&
+ tgtVertex.innerId == e.tgtVertex.innerId &&
+ labelWithDir == e.labelWithDir && op == e.op && version == e.version &&
+ pendingEdgeOpt == e.pendingEdgeOpt && lockTs == lockTs && statusCode == statusCode
+ case _ => false
+ }
+
+ override def toString(): String = {
+ Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> label.label, "direction" -> direction,
+ "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString,
+ "statusCode" -> statusCode, "lockTs" -> lockTs).toString
+ }
}
-case class IndexEdge(srcVertex: Vertex,
+case class IndexEdge(graph: Graph,
+ srcVertex: Vertex,
tgtVertex: Vertex,
label: Label,
- direction: Int,
+ dir: Int,
op: Byte,
version: Long,
labelIndexSeq: Byte,
- private val propsWithTs: Map[LabelMeta, InnerValLikeWithTs],
+ private val propsWithTs: Props,
tsInnerValOpt: Option[InnerValLike] = None) {
// if (!props.contains(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.")
// assert(props.contains(LabelMeta.timeStampSeq))
- lazy val labelWithDir = LabelWithDirection(label.id.get, direction)
+ lazy val direction = GraphUtil.fromDirection(dir)
+ lazy val operation = GraphUtil.fromOp(op)
+ lazy val edge = toEdge
+ lazy val labelWithDir = LabelWithDirection(label.id.get, dir)
lazy val isInEdge = labelWithDir.dir == GraphUtil.directions("in")
lazy val isOutEdge = !isInEdge
- lazy val ts = propsWithTs(LabelMeta.timestamp).innerVal.toString.toLong
- lazy val degreeEdge = propsWithTs.contains(LabelMeta.degree)
+ lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.toString.toLong
+ lazy val degreeEdge = propsWithTs.containsKey(LabelMeta.degree.name)
lazy val schemaVer = label.schemaVersion
lazy val labelIndex = LabelIndex.findByLabelIdAndSeq(labelWithDir.labelId, labelIndexSeq).get
@@ -103,8 +139,8 @@ case class IndexEdge(srcVertex: Vertex,
/** TODO: make sure call of this class fill props as this assumes */
lazy val orders = for (meta <- labelIndexMetaSeqs) yield {
- propsWithTs.get(meta) match {
- case None =>
+ propsWithTs.get(meta.name) match {
+ case null =>
/**
* TODO: agly hack
@@ -120,12 +156,12 @@ case class IndexEdge(srcVertex: Vertex,
}
meta -> v
- case Some(v) => meta -> v.innerVal
+ case v => meta -> v.innerVal
}
}
- lazy val ordersKeyMap = orders.map { case (byte, _) => byte }.toSet
- lazy val metas = for ((meta, v) <- propsWithTs if !ordersKeyMap.contains(meta)) yield meta -> v.innerVal
+ lazy val ordersKeyMap = orders.map { case (meta, _) => meta.name }.toSet
+ lazy val metas = for ((meta, v) <- propsWithTs.asScala if !ordersKeyMap.contains(meta)) yield v.labelMeta -> v.innerVal
// lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) }
@@ -135,12 +171,13 @@ case class IndexEdge(srcVertex: Vertex,
lazy val hasAllPropsForIndex = orders.length == labelIndexMetaSeqs.length
def propsWithName = for {
- (meta, v) <- propsWithTs if meta.seq >= 0
+ (_, v) <- propsWithTs.asScala
+ meta = v.labelMeta
jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
} yield meta.name -> jsValue
- def toEdge: Edge = Edge(srcVertex, tgtVertex, label, direction, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
+ def toEdge: Edge = Edge(graph, srcVertex, tgtVertex, label, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
// only for debug
def toLogString() = {
@@ -152,52 +189,99 @@ case class IndexEdge(srcVertex: Vertex,
}
def property(labelMeta: LabelMeta): InnerValLikeWithTs = {
- propsWithTs.get(labelMeta).getOrElse(label.metaPropsDefaultMapInner(labelMeta))
+// propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse(label.metaPropsDefaultMapInner(labelMeta))
+ if (propsWithTs.containsKey(labelMeta.name)) {
+ propsWithTs.get(labelMeta.name).innerValWithTs
+ } else {
+ label.metaPropsDefaultMapInner(labelMeta)
+ }
}
- def updatePropsWithTs(others: Map[LabelMeta, InnerValLikeWithTs] = Map.empty): Map[LabelMeta, InnerValLikeWithTs] = {
+ def updatePropsWithTs(others: Props = Edge.EmptyProps): Props = {
if (others.isEmpty) propsWithTs
- else propsWithTs ++ others
+ else {
+ val iter = others.entrySet().iterator()
+ while (iter.hasNext) {
+ val e = iter.next()
+ propsWithTs.put(e.getKey, e.getValue)
+ }
+ propsWithTs
+ }
+ }
+
+ def property[V](key: String, value: V, ts: Long): S2Property[V] = {
+ val labelMeta = label.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on IndexEdge."))
+ val newProps = new S2Property(edge, labelMeta, key, value, ts)
+ propsWithTs.put(key, newProps)
+ newProps
+ }
+ override def hashCode(): Int = {
+ MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId + "," + labelIndexSeq)
+ }
+
+ override def equals(other: Any): Boolean = other match {
+ case e: IndexEdge =>
+ srcVertex.innerId == e.srcVertex.innerId &&
+ tgtVertex.innerId == e.tgtVertex.innerId &&
+ labelWithDir == e.labelWithDir && op == e.op && version == e.version &&
+ labelIndexSeq == e.labelIndexSeq
+ case _ => false
+ }
+
+ override def toString(): String = {
+ Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> label.label, "direction" -> dir,
+ "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString
+ ).toString
}
}
-case class Edge(srcVertex: Vertex,
- tgtVertex: Vertex,
+case class Edge(innerGraph: Graph,
+ srcVertex: Vertex,
+ var tgtVertex: Vertex,
innerLabel: Label,
dir: Int,
- op: Byte = GraphUtil.defaultOpByte,
- version: Long = System.currentTimeMillis(),
- private val propsWithTs: Map[LabelMeta, InnerValLikeWithTs],
+ var op: Byte = GraphUtil.defaultOpByte,
+ var version: Long = System.currentTimeMillis(),
+ propsWithTs: Props = Edge.EmptyProps,
parentEdges: Seq[EdgeWithScore] = Nil,
originalEdgeOpt: Option[Edge] = None,
pendingEdgeOpt: Option[Edge] = None,
statusCode: Byte = 0,
lockTs: Option[Long] = None,
- tsInnerValOpt: Option[InnerValLike] = None) extends GraphElement with TpEdge {
+ var tsInnerValOpt: Option[InnerValLike] = None) extends GraphElement with TpEdge {
lazy val labelWithDir = LabelWithDirection(innerLabel.id.get, dir)
lazy val schemaVer = innerLabel.schemaVersion
- lazy val ts = propsWithTs(LabelMeta.timestamp).innerVal.value match {
+ lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.value match {
case b: BigDecimal => b.longValue()
case l: Long => l
case i: Int => i.toLong
case _ => throw new RuntimeException("ts should be in [BigDecimal/Long/Int].")
}
- //FIXME
+ lazy val operation = GraphUtil.fromOp(op)
lazy val tsInnerVal = tsInnerValOpt.get.value
lazy val srcId = srcVertex.innerIdVal
lazy val tgtId = tgtVertex.innerIdVal
lazy val labelName = innerLabel.label
lazy val direction = GraphUtil.fromDirection(dir)
- def toIndexEdge(labelIndexSeq: Byte): IndexEdge = IndexEdge(srcVertex, tgtVertex, innerLabel, dir, op, version, labelIndexSeq, propsWithTs)
+ def toIndexEdge(labelIndexSeq: Byte): IndexEdge = IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelIndexSeq, propsWithTs)
- def serializePropsWithTs(): Array[Byte] = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.map(kv => kv._1.seq -> kv._2).toSeq)
+ def serializePropsWithTs(): Array[Byte] = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq)
- def updatePropsWithTs(others: Map[LabelMeta, InnerValLikeWithTs] = Map.empty): Map[LabelMeta, InnerValLikeWithTs] = {
- if (others.isEmpty) propsWithTs
- else propsWithTs ++ others
+ def updatePropsWithTs(others: Props = Edge.EmptyProps): Props = {
+ val emptyProp = Edge.EmptyProps
+
+ propsWithTs.forEach(new BiConsumer[String, S2Property[_]] {
+ override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value)
+ })
+
+ others.forEach(new BiConsumer[String, S2Property[_]] {
+ override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value)
+ })
+
+ emptyProp
}
def propertyValue(key: String): Option[InnerValLikeWithTs] = {
@@ -212,7 +296,12 @@ case class Edge(srcVertex: Vertex,
}
def propertyValueInner(labelMeta: LabelMeta): InnerValLikeWithTs= {
- propsWithTs.getOrElse(labelMeta, innerLabel.metaPropsDefaultMapInner(labelMeta))
+ // propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse()
+ if (propsWithTs.containsKey(labelMeta.name)) {
+ propsWithTs.get(labelMeta.name).innerValWithTs
+ } else {
+ innerLabel.metaPropsDefaultMapInner(labelMeta)
+ }
}
def propertyValues(keys: Seq[String] = Nil): Map[LabelMeta, InnerValLikeWithTs] = {
@@ -242,14 +331,21 @@ case class Edge(srcVertex: Vertex,
lazy val properties = toProps()
- def props = propsWithTs.mapValues(_.innerVal)
+ def props = propsWithTs.asScala.mapValues(_.innerVal)
private def toProps(): Map[String, Any] = {
for {
(labelMeta, defaultVal) <- innerLabel.metaPropsDefaultMapInner
} yield {
- labelMeta.name -> propsWithTs.getOrElse(labelMeta, defaultVal).innerVal.value
+ // labelMeta.name -> propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse(defaultVal).innerVal.value
+ val value =
+ if (propsWithTs.containsKey(labelMeta.name)) {
+ propsWithTs.get(labelMeta.name).value
+ } else {
+ defaultVal.innerVal.value
+ }
+ labelMeta.name -> value
}
}
@@ -302,21 +398,21 @@ case class Edge(srcVertex: Vertex,
override def isAsync = innerLabel.isAsync
- def isDegree = propsWithTs.contains(LabelMeta.degree)
+ def isDegree = propsWithTs.containsKey(LabelMeta.degree.name)
// def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match {
// case Some(_) => props
// case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer))
// }
- def propsPlusTsValid = propsWithTs.filter(kv => LabelMeta.isValidSeq(kv._1.seq))
+ def propsPlusTsValid = propsWithTs.asScala.filter(kv => LabelMeta.isValidSeq(kv._2.labelMeta.seq)).asJava
def edgesWithIndex = for (labelOrder <- labelOrders) yield {
- IndexEdge(srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsWithTs, tsInnerValOpt = tsInnerValOpt)
+ IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsWithTs, tsInnerValOpt = tsInnerValOpt)
}
def edgesWithIndexValid = for (labelOrder <- labelOrders) yield {
- IndexEdge(srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsPlusTsValid, tsInnerValOpt = tsInnerValOpt)
+ IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsPlusTsValid, tsInnerValOpt = tsInnerValOpt)
}
/** force direction as out on invertedEdge */
@@ -325,38 +421,28 @@ case class Edge(srcVertex: Vertex,
// val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, GraphUtil.directions("out"))
- val ret = SnapshotEdge(smaller, larger, innerLabel, GraphUtil.directions("out"), op, version,
- Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(ts, schemaVer), ts)) ++ propsWithTs,
+ property(LabelMeta.timestamp.name, ts, ts)
+ val ret = SnapshotEdge(innerGraph, smaller, larger, innerLabel,
+ GraphUtil.directions("out"), op, version, propsWithTs,
pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt)
ret
}
- override def hashCode(): Int = {
- MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId)
- }
-
- override def equals(other: Any): Boolean = other match {
- case e: Edge =>
- srcVertex.innerId == e.srcVertex.innerId &&
- tgtVertex.innerId == e.tgtVertex.innerId &&
- labelWithDir == e.labelWithDir
- case _ => false
- }
-
def defaultPropsWithName = Json.obj("from" -> srcVertex.innerId.toString(), "to" -> tgtVertex.innerId.toString(),
"label" -> innerLabel.label, "service" -> innerLabel.serviceName)
def propsWithName =
for {
- (meta, v) <- props if meta.seq > 0
- jsValue <- innerValToJsValue(v, meta.dataType)
+ (_, v) <- propsWithTs.asScala
+ meta = v.labelMeta
+ jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
} yield meta.name -> jsValue
def updateTgtVertex(id: InnerValLike) = {
val newId = TargetVertexId(tgtVertex.id.colId, id)
val newTgtVertex = Vertex(newId, tgtVertex.ts, tgtVertex.props)
- Edge(srcVertex, newTgtVertex, innerLabel, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
+ Edge(innerGraph, srcVertex, newTgtVertex, innerLabel, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
}
def rank(r: RankParam): Double =
@@ -364,17 +450,15 @@ case class Edge(srcVertex: Vertex,
else {
var sum: Double = 0
- for ((seq, w) <- r.keySeqAndWeights) {
- propsWithTs.get(seq) match {
- case None => // do nothing
- case Some(innerValWithTs) => {
- val cost = try innerValWithTs.innerVal.toString.toDouble catch {
- case e: Exception =>
- logger.error("toInnerval failed in rank", e)
- 1.0
- }
- sum += w * cost
+ for ((labelMeta, w) <- r.keySeqAndWeights) {
+ if (propsWithTs.containsKey(labelMeta.name)) {
+ val innerValWithTs = propsWithTs.get(labelMeta.name)
+ val cost = try innerValWithTs.innerVal.toString.toDouble catch {
+ case e: Exception =>
+ logger.error("toInnerval failed in rank", e)
+ 1.0
}
+ sum += w * cost
}
}
sum
@@ -385,23 +469,92 @@ case class Edge(srcVertex: Vertex,
List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, allPropsWithName).mkString("\t")
}
+ override def hashCode(): Int = {
+ MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId)
+ }
+
+ override def equals(other: Any): Boolean = other match {
+ case e: Edge =>
+ srcVertex.innerId == e.srcVertex.innerId &&
+ tgtVertex.innerId == e.tgtVertex.innerId &&
+ labelWithDir == e.labelWithDir && op == e.op && version == e.version &&
+ pendingEdgeOpt == e.pendingEdgeOpt && lockTs == lockTs && statusCode == statusCode &&
+ parentEdges == e.parentEdges && originalEdgeOpt == originalEdgeOpt
+ case _ => false
+ }
+
+ override def toString(): String = {
+ Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> labelName, "direction" -> direction,
+ "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString,
+ "parentEdges" -> parentEdges, "originalEdge" -> originalEdgeOpt, "statusCode" -> statusCode, "lockTs" -> lockTs
+ ).toString
+ }
+
+ def checkProperty(key: String): Boolean = propsWithTs.containsKey(key)
+
+ def copyEdge(srcVertex: Vertex = srcVertex,
+ tgtVertex: Vertex = tgtVertex,
+ innerLabel: Label = innerLabel,
+ dir: Int = dir,
+ op: Byte = op,
+ version: Long = version,
+ propsWithTs: State = Edge.propsToState(this.propsWithTs),
+ parentEdges: Seq[EdgeWithScore] = parentEdges,
+ originalEdgeOpt: Option[Edge] = originalEdgeOpt,
+ pendingEdgeOpt: Option[Edge] = pendingEdgeOpt,
+ statusCode: Byte = statusCode,
+ lockTs: Option[Long] = lockTs,
+ tsInnerValOpt: Option[InnerValLike] = tsInnerValOpt,
+ ts: Long = ts): Edge = {
+ val edge = new Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, Edge.EmptyProps,
+ parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
+ Edge.fillPropsWithTs(edge, propsWithTs)
+ edge.property(LabelMeta.timestamp.name, ts, ts)
+ edge
+ }
+
+ def copyEdgeWithState(state: State, ts: Long): Edge = {
+ val newEdge = copy(propsWithTs = Edge.EmptyProps)
+ Edge.fillPropsWithTs(newEdge, state)
+ newEdge.property(LabelMeta.timestamp.name, ts, ts)
+ newEdge
+ }
+
+ def copyEdgeWithState(state: State): Edge = {
+ val newEdge = copy(propsWithTs = Edge.EmptyProps)
+ Edge.fillPropsWithTs(newEdge, state)
+ newEdge
+ }
+
override def vertices(direction: Direction): util.Iterator[structure.Vertex] = ???
override def properties[V](strings: String*): util.Iterator[Property[V]] = ???
- override def property[V](key: String): Property[V] = ???
+ override def property[V](key: String): Property[V] = {
+ val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge."))
+ if (propsWithTs.containsKey(key)) propsWithTs.get(key).asInstanceOf[Property[V]]
+ else {
+ val default = innerLabel.metaPropsDefaultMapInner(labelMeta)
+ property(key, default.innerVal.value, default.ts).asInstanceOf[Property[V]]
+ }
+ }
override def property[V](key: String, value: V): Property[V] = {
property(key, value, System.currentTimeMillis())
}
- def property[V](key: String, value: V, ts: Long): Property[V] = ???
+ def property[V](key: String, value: V, ts: Long): Property[V] = {
+ val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge."))
+ val newProp = new S2Property[V](this, labelMeta, key, value, ts)
+ propsWithTs.put(key, newProp)
+ newProp
+ }
- override def remove(): Unit = ???
+ override def remove(): Unit = {}
- override def graph(): TpGraph = ???
-
- override def id(): AnyRef = ???
+ override def graph(): TpGraph = innerGraph
+
+ override def id(): AnyRef = (srcVertex.innerId, labelWithDir, tgtVertex.innerId)
override def label(): String = innerLabel.label
}
@@ -425,38 +578,63 @@ object Edge {
val incrementVersion = 1L
val minTsVal = 0L
- def toEdge(srcId: Any,
- tgtId: Any,
- labelName: String,
- direction: String,
- props: Map[String, Any] = Map.empty,
- ts: Long = System.currentTimeMillis(),
- operation: String = "insert"): Edge = {
- val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
-
- val srcVertexId = toInnerVal(srcId.toString, label.srcColumn.columnType, label.schemaVersion)
- val tgtVertexId = toInnerVal(tgtId.toString, label.tgtColumn.columnType, label.schemaVersion)
+ /** now version information is required also **/
+ type Props = java.util.Map[String, S2Property[_]]
+ type State = Map[LabelMeta, InnerValLikeWithTs]
+ type PropsPairWithTs = (State, State, Long, String)
+ type MergeState = PropsPairWithTs => (State, Boolean)
+ type UpdateFunc = (Option[Edge], Edge, MergeState)
- val srcColId = label.srcColumn.id.get
- val tgtColId = label.tgtColumn.id.get
+ def EmptyProps = new java.util.HashMap[String, S2Property[_]]
+ def EmptyState = Map.empty[LabelMeta, InnerValLikeWithTs]
+ def sameProps(base: Props, other: Props): Boolean = {
+ if (base.size != other.size) false
+ else {
+ var ret = true
+ val iter = base.entrySet().iterator()
+ while (iter.hasNext) {
+ val e = iter.next()
+ if (!other.containsKey(e.getKey)) ret = false
+ else if (e.getValue != other.get(e.getKey)) ret = false
+ else {
- val srcVertex = Vertex(SourceVertexId(srcColId, srcVertexId), System.currentTimeMillis())
- val tgtVertex = Vertex(TargetVertexId(tgtColId, tgtVertexId), System.currentTimeMillis())
- val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
+ }
+ }
+ val otherIter = other.entrySet().iterator()
+ while (otherIter.hasNext) {
+ val e = otherIter.next()
+ if (!base.containsKey(e.getKey)) ret = false
+ else if (e.getValue != base.get(e.getKey)) ret = false
+ else {
- val labelWithDir = LabelWithDirection(label.id.get, dir)
- val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts)
- val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
- val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
+ }
+ }
+ ret
+ }
+// base.sameElements(other)
+ }
+ def fillPropsWithTs(snapshotEdge: SnapshotEdge, state: State): Unit = {
+ state.foreach { case (k, v) => snapshotEdge.property(k.name, v.innerVal.value, v.ts) }
+ }
+ def fillPropsWithTs(indexEdge: IndexEdge, state: State): Unit = {
+ state.foreach { case (k, v) => indexEdge.property(k.name, v.innerVal.value, v.ts) }
+ }
+ def fillPropsWithTs(edge: Edge, state: State): Unit = {
+ state.foreach { case (k, v) => edge.property(k.name, v.innerVal.value, v.ts) }
+ }
- new Edge(srcVertex, tgtVertex, label, dir, op = op, version = ts, propsWithTs = propsWithTs)
+ def propsToState(props: Props): State = {
+ props.asScala.map { case (k, v) =>
+ v.labelMeta -> v.innerValWithTs
+ }.toMap
}
- /** now version information is required also **/
- type State = Map[LabelMeta, InnerValLikeWithTs]
- type PropsPairWithTs = (State, State, Long, String)
- type MergeState = PropsPairWithTs => (State, Boolean)
- type UpdateFunc = (Option[Edge], Edge, MergeState)
+ def stateToProps(edge: Edge, state: State): Props = {
+ state.foreach { case (k, v) =>
+ edge.property(k.name, v.innerVal.value, v.ts)
+ }
+ edge.propsWithTs
+ }
def allPropsDeleted(props: Map[LabelMeta, InnerValLikeWithTs]): Boolean =
if (!props.contains(LabelMeta.lastDeletedAt)) false
@@ -467,6 +645,23 @@ object Edge {
propsWithoutLastDeletedAt.forall { case (_, v) => v.ts <= lastDeletedAt }
}
+ def allPropsDeleted(props: Props): Boolean =
+ if (!props.containsKey(LabelMeta.lastDeletedAt.name)) false
+ else {
+ val lastDeletedAt = props.get(LabelMeta.lastDeletedAt.name).ts
+ props.remove(LabelMeta.lastDeletedAt.name)
+// val propsWithoutLastDeletedAt = props
+//
+// propsWithoutLastDeletedAt.forall { case (_, v) => v.ts <= lastDeletedAt }
+ var ret = true
+ val iter = props.entrySet().iterator()
+ while (iter.hasNext) {
+ val e = iter.next()
+ if (e.getValue.ts > lastDeletedAt) ret = false
+ }
+ ret
+ }
+
def buildDeleteBulk(invertedEdge: Option[Edge], requestEdge: Edge): (Edge, EdgeMutate) = {
// assert(invertedEdge.isEmpty)
// assert(requestEdge.op == GraphUtil.operations("delete"))
@@ -481,7 +676,8 @@ object Edge {
// logger.debug(s"oldEdge: ${invertedEdge.map(_.toStringRaw)}")
// logger.debug(s"requestEdge: ${requestEdge.toStringRaw}")
val oldPropsWithTs =
- if (invertedEdge.isEmpty) Map.empty[LabelMeta, InnerValLikeWithTs] else invertedEdge.get.propsWithTs
+ if (invertedEdge.isEmpty) Map.empty[LabelMeta, InnerValLikeWithTs]
+ else propsToState(invertedEdge.get.propsWithTs)
val funcs = requestEdges.map { edge =>
if (edge.op == GraphUtil.operations("insert")) {
@@ -514,7 +710,7 @@ object Edge {
for {
(requestEdge, func) <- requestWithFuncs
} {
- val (_newPropsWithTs, _) = func(prevPropsWithTs, requestEdge.propsWithTs, requestEdge.ts, requestEdge.schemaVer)
+ val (_newPropsWithTs, _) = func(prevPropsWithTs, propsToState(requestEdge.propsWithTs), requestEdge.ts, requestEdge.schemaVer)
prevPropsWithTs = _newPropsWithTs
// logger.debug(s"${requestEdge.toLogString}\n$oldPropsWithTs\n$prevPropsWithTs\n")
}
@@ -530,7 +726,9 @@ object Edge {
// logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}")
// logger.error(s"$propsWithTs")
- (requestEdge.copy(propsWithTs = propsWithTs), edgeMutate)
+ val newEdge = requestEdge.copy(propsWithTs = EmptyProps)
+ fillPropsWithTs(newEdge, propsWithTs)
+ (newEdge, edgeMutate)
}
}
@@ -540,7 +738,7 @@ object Edge {
// both direction use same indices that is defined when label creation.
true
case Some(dir) =>
- if (dir != ie.direction) {
+ if (dir != ie.dir) {
// current labelIndex's direction is different with indexEdge's direction so don't touch
false
} else {
@@ -566,13 +764,14 @@ object Edge {
val newOp = snapshotEdgeOpt match {
case None => requestEdge.op
case Some(old) =>
- val oldMaxTs = old.propsWithTs.map(_._2.ts).max
+ val oldMaxTs = old.propsWithTs.asScala.map(_._2.ts).max
if (oldMaxTs > requestEdge.ts) old.op
else requestEdge.op
}
- val newSnapshotEdgeOpt =
- Option(requestEdge.copy(op = newOp, propsWithTs = newPropsWithTs, version = newVersion).toSnapshotEdge)
+ val newSnapshotEdge = requestEdge.copy(op = newOp, version = newVersion).copyEdgeWithState(newPropsWithTs)
+
+ val newSnapshotEdgeOpt = Option(newSnapshotEdge.toSnapshotEdge)
// delete request must always update snapshot.
if (withOutDeletedAt == oldPropsWithTs && newPropsWithTs.contains(LabelMeta.lastDeletedAt)) {
// no mutation on indexEdges. only snapshotEdge should be updated to record lastDeletedAt.
@@ -587,12 +786,17 @@ object Edge {
val edgesToInsert =
if (newPropsWithTs.isEmpty || allPropsDeleted(newPropsWithTs)) Nil
- else
- requestEdge.copy(
+ else {
+ val newEdge = requestEdge.copy(
version = newVersion,
- propsWithTs = newPropsWithTs,
+ propsWithTs = Edge.EmptyProps,
op = GraphUtil.defaultOpByte
- ).relatedEdges.flatMap { relEdge => filterOutWithLabelOption(relEdge.edgesWithIndexValid) }
+ )
+ newPropsWithTs.foreach { case (k, v) => newEdge.property(k.name, v.innerVal.value, v.ts) }
+
+ newEdge.relatedEdges.flatMap { relEdge => filterOutWithLabelOption(relEdge.edgesWithIndexValid) }
+ }
+
EdgeMutate(edgesToDelete = edgesToDelete,
edgesToInsert = edgesToInsert,
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
index a2b17ef..ec3f286 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
@@ -19,10 +19,11 @@
package org.apache.s2graph.core
+import java.util
import java.util.concurrent.Executors
import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.hadoop.fs.Path
+import org.apache.commons.configuration.Configuration
import org.apache.s2graph.core.GraphExceptions.{FetchAllStepFailException, FetchTimeoutException, LabelNotExistException}
import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core.mysqls.{Label, LabelMeta, Model, Service}
@@ -30,8 +31,11 @@ import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
import org.apache.s2graph.core.storage.{SKeyValue, Storage}
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.utils.{DeferCache, Extensions, SafeUpdateCache, logger}
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer
+import org.apache.tinkerpop.gremlin.structure
+import org.apache.tinkerpop.gremlin.structure.Graph.Variables
+import org.apache.tinkerpop.gremlin.structure.{Graph => TpGraph, Transaction}
import play.api.libs.json.{JsObject, Json}
-
import scala.annotation.tailrec
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
@@ -83,62 +87,7 @@ object Graph {
var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs)
- def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try {
- val parts = GraphUtil.split(s)
- val logType = parts(2)
- val element = if (logType == "edge" | logType == "e") {
- /** current only edge is considered to be bulk loaded */
- labelMapping.get(parts(5)) match {
- case None =>
- case Some(toReplace) =>
- parts(5) = toReplace
- }
- toEdge(parts)
- } else if (logType == "vertex" | logType == "v") {
- toVertex(parts)
- } else {
- throw new GraphExceptions.JsonParseException("log type is not exist in log.")
- }
- element
- } recover {
- case e: Exception =>
- logger.error(s"[toElement]: $e", e)
- None
- } get
-
-
- def toVertex(s: String): Option[Vertex] = {
- toVertex(GraphUtil.split(s))
- }
-
- def toEdge(s: String): Option[Edge] = {
- toEdge(GraphUtil.split(s))
- }
-
- def toEdge(parts: Array[String]): Option[Edge] = Try {
- val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
- val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
- val tempDirection = if (parts.length >= 8) parts(7) else "out"
- val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection
- val edge = Edge.toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation)
- Option(edge)
- } recover {
- case e: Exception =>
- logger.error(s"[toEdge]: $e", e)
- throw e
- } get
-
- def toVertex(parts: Array[String]): Option[Vertex] = Try {
- val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
- val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
- val vertex = Vertex.toVertex(serviceName, colName, srcId, props, ts.toLong, operation)
- Option(vertex)
- } recover {
- case e: Throwable =>
- logger.error(s"[toVertex]: $e", e)
- throw e
- } get
def initStorage(graph: Graph, config: Config)(ec: ExecutionContext): Storage[_, _] = {
val storageBackend = config.getString("s2graph.storage.backend")
@@ -326,7 +275,9 @@ object Graph {
/** Select */
val mergedPropsWithTs = edge.propertyValuesInner(propsSelectColumns)
- val newEdge = edge.copy(propsWithTs = mergedPropsWithTs)
+// val newEdge = edge.copy(propsWithTs = mergedPropsWithTs)
+ val newEdge = edge.copyEdgeWithState(mergedPropsWithTs)
+
val newEdgeWithScore = edgeWithScore.copy(edge = newEdge)
/** OrderBy */
val orderByValues =
@@ -410,7 +361,7 @@ object Graph {
edge.propertyValues(queryOption.selectColumns) ++ initial
}
- val newEdge = edge.copy(propsWithTs = mergedPropsWithTs)
+ val newEdge = edge.copyEdgeWithState(mergedPropsWithTs)
edgeWithScore.copy(edge = newEdge)
}
} else Nil
@@ -544,7 +495,7 @@ object Graph {
}
-class Graph(_config: Config)(implicit val ec: ExecutionContext) {
+class Graph(_config: Config)(implicit val ec: ExecutionContext) extends TpGraph {
import Graph._
@@ -948,20 +899,28 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) {
val head = filtered.head
val label = head.edge.innerLabel
val edgeWithScoreLs = filtered.map { edgeWithScore =>
- val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match {
- case "strong" =>
- val _newPropsWithTs = edgeWithScore.edge.updatePropsWithTs(
- Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion))
- )
-
- (GraphUtil.operations("delete"), requestTs, _newPropsWithTs)
- case _ =>
- val oldEdge = edgeWithScore.edge
- (oldEdge.op, oldEdge.version, oldEdge.updatePropsWithTs())
- }
-
- val copiedEdge =
- edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs)
+ val edge = edgeWithScore.edge
+ val copiedEdge = label.consistencyLevel match {
+ case "strong" =>
+ edge.copyEdge(op = GraphUtil.operations("delete"),
+ version = requestTs, propsWithTs = Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs)
+ case _ =>
+ edge.copyEdge(propsWithTs = Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs)
+ }
+// val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match {
+// case "strong" =>
+// val edge = edgeWithScore.edge
+// edge.property(LabelMeta.timestamp.name, requestTs)
+// val _newPropsWithTs = edge.updatePropsWithTs()
+//
+// (GraphUtil.operations("delete"), requestTs, _newPropsWithTs)
+// case _ =>
+// val oldEdge = edgeWithScore.edge
+// (oldEdge.op, oldEdge.version, oldEdge.updatePropsWithTs())
+// }
+//
+// val copiedEdge =
+// edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs)
val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
// logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}")
@@ -1099,7 +1058,7 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) {
operation: String = "insert",
withWait: Boolean = true): Future[Boolean] = {
- val innerEdges = Seq(Edge.toEdge(srcId, tgtId, labelName, direction, props.toMap, ts, operation))
+ val innerEdges = Seq(toEdge(srcId, tgtId, labelName, direction, props.toMap, ts, operation))
mutateEdges(innerEdges, withWait).map(_.headOption.getOrElse(false))
}
@@ -1113,4 +1072,141 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) {
val innerVertices = Seq(Vertex.toVertex(serviceName, columnName, id, props.toMap, ts, operation))
mutateVertices(innerVertices, withWait).map(_.headOption.getOrElse(false))
}
+
+ def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try {
+ val parts = GraphUtil.split(s)
+ val logType = parts(2)
+ val element = if (logType == "edge" | logType == "e") {
+ /** current only edge is considered to be bulk loaded */
+ labelMapping.get(parts(5)) match {
+ case None =>
+ case Some(toReplace) =>
+ parts(5) = toReplace
+ }
+ toEdge(parts)
+ } else if (logType == "vertex" | logType == "v") {
+ toVertex(parts)
+ } else {
+ throw new GraphExceptions.JsonParseException("log type is not exist in log.")
+ }
+
+ element
+ } recover {
+ case e: Exception =>
+ logger.error(s"[toElement]: $e", e)
+ None
+ } get
+
+
+ def toVertex(s: String): Option[Vertex] = {
+ toVertex(GraphUtil.split(s))
+ }
+
+ def toEdge(s: String): Option[Edge] = {
+ toEdge(GraphUtil.split(s))
+ }
+
+ def toEdge(parts: Array[String]): Option[Edge] = Try {
+ val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
+ val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
+ val tempDirection = if (parts.length >= 8) parts(7) else "out"
+ val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection
+ val edge = toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation)
+ Option(edge)
+ } recover {
+ case e: Exception =>
+ logger.error(s"[toEdge]: $e", e)
+ throw e
+ } get
+
+ def toVertex(parts: Array[String]): Option[Vertex] = Try {
+ val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
+ val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
+ val vertex = Vertex.toVertex(serviceName, colName, srcId, props, ts.toLong, operation)
+ Option(vertex)
+ } recover {
+ case e: Throwable =>
+ logger.error(s"[toVertex]: $e", e)
+ throw e
+ } get
+
+ def newSnapshotEdge(srcVertex: Vertex,
+ tgtVertex: Vertex,
+ label: Label,
+ dir: Int,
+ op: Byte,
+ version: Long,
+ propsWithTs: Edge.State,
+ pendingEdgeOpt: Option[Edge],
+ statusCode: Byte = 0,
+ lockTs: Option[Long],
+ tsInnerValOpt: Option[InnerValLike] = None): SnapshotEdge = {
+ val snapshotEdge = new SnapshotEdge(this, srcVertex, tgtVertex, label, dir, op, version, Edge.EmptyProps,
+ pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
+ Edge.fillPropsWithTs(snapshotEdge, propsWithTs)
+ snapshotEdge
+ }
+
+ def newEdge(srcVertex: Vertex,
+ tgtVertex: Vertex,
+ innerLabel: Label,
+ dir: Int,
+ op: Byte = GraphUtil.defaultOpByte,
+ version: Long = System.currentTimeMillis(),
+ propsWithTs: Edge.State,
+ parentEdges: Seq[EdgeWithScore] = Nil,
+ originalEdgeOpt: Option[Edge] = None,
+ pendingEdgeOpt: Option[Edge] = None,
+ statusCode: Byte = 0,
+ lockTs: Option[Long] = None,
+ tsInnerValOpt: Option[InnerValLike] = None): Edge = {
+ val edge = new Edge(this, srcVertex, tgtVertex, innerLabel, dir, op, version, Edge.EmptyProps,
+ parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
+ Edge.fillPropsWithTs(edge, propsWithTs)
+ edge
+ }
+ def toEdge(srcId: Any,
+ tgtId: Any,
+ labelName: String,
+ direction: String,
+ props: Map[String, Any] = Map.empty,
+ ts: Long = System.currentTimeMillis(),
+ operation: String = "insert"): Edge = {
+ val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
+
+ val srcVertexId = toInnerVal(srcId.toString, label.srcColumn.columnType, label.schemaVersion)
+ val tgtVertexId = toInnerVal(tgtId.toString, label.tgtColumn.columnType, label.schemaVersion)
+
+ val srcColId = label.srcColumn.id.get
+ val tgtColId = label.tgtColumn.id.get
+
+ val srcVertex = Vertex(SourceVertexId(srcColId, srcVertexId), System.currentTimeMillis())
+ val tgtVertex = Vertex(TargetVertexId(tgtColId, tgtVertexId), System.currentTimeMillis())
+ val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
+
+ val labelWithDir = LabelWithDirection(label.id.get, dir)
+ val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts)
+ val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
+ val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
+
+ new Edge(this, srcVertex, tgtVertex, label, dir, op = op, version = ts).copyEdgeWithState(propsWithTs)
+ }
+
+ override def vertices(objects: AnyRef*): util.Iterator[structure.Vertex] = ???
+
+ override def tx(): Transaction = ???
+
+ override def edges(objects: AnyRef*): util.Iterator[structure.Edge] = ???
+
+ override def variables(): Variables = ???
+
+ override def configuration(): Configuration = ???
+
+ override def addVertex(objects: AnyRef*): structure.Vertex = ???
+
+ override def close(): Unit = ???
+
+ override def compute[C <: GraphComputer](aClass: Class[C]): C = ???
+
+ override def compute(): GraphComputer = ???
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
index 7b10709..170fd0b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -376,9 +376,9 @@ case class QueryParam(labelName: String,
val propVal =
if (InnerVal.isNumericType(labelMeta.dataType)) {
- InnerVal.withLong(edge.props(labelMeta).value.asInstanceOf[BigDecimal].longValue() + padding, label.schemaVersion)
+ InnerVal.withLong(edge.property(labelMeta.name).value.asInstanceOf[BigDecimal].longValue() + padding, label.schemaVersion)
} else {
- edge.props(labelMeta)
+ edge.property(labelMeta.name).asInstanceOf[S2Property[_]].innerVal
}
labelMeta -> propVal
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
index d8416c2..3753d0f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
@@ -40,7 +40,7 @@ object QueryResult {
val edgeWithScores = for {
vertex <- query.vertices
} yield {
- val edge = Edge(vertex, vertex, label, queryParam.labelWithDir.dir, propsWithTs = propsWithTs)
+ val edge = graph.newEdge(vertex, vertex, label, queryParam.labelWithDir.dir, propsWithTs = propsWithTs)
val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore, queryParam.label)
edgeWithScore
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
index 67a9d4c..938a9bb 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
@@ -1,29 +1,48 @@
package org.apache.s2graph.core
-import org.apache.hadoop.hbase.util.Bytes
+
import org.apache.s2graph.core.mysqls.LabelMeta
-import org.apache.s2graph.core.types.CanInnerValLike
-import org.apache.tinkerpop.gremlin.structure.{Element, Property}
+import org.apache.s2graph.core.types.{InnerValLikeWithTs, CanInnerValLike}
+import org.apache.tinkerpop.gremlin.structure.{Property}
+
+import scala.util.hashing.MurmurHash3
-case class S2Property[V](element: Element,
+case class S2Property[V](element: Edge,
labelMeta: LabelMeta,
key: String,
value: V,
- ts: Long = System.currentTimeMillis()) extends Property[V] {
+ ts: Long) extends Property[V] {
import CanInnerValLike._
- lazy val innerVal = anyToInnerValLike.toInnerVal(value, labelMeta.label.schemaVersion)
+ lazy val innerVal = anyToInnerValLike.toInnerVal(value)(element.innerLabel.schemaVersion)
+ lazy val innerValWithTs = InnerValLikeWithTs(innerVal, ts)
def bytes: Array[Byte] = {
innerVal.bytes
}
def bytesWithTs: Array[Byte] = {
- Bytes.add(innerVal.bytes, Bytes.toBytes(ts))
+ innerValWithTs.bytes
}
override def isPresent: Boolean = ???
override def remove(): Unit = ???
+
+ override def hashCode(): Int = {
+ MurmurHash3.stringHash(labelMeta.labelId + "," + labelMeta.id.get + "," + key + "," + value + "," + ts)
+ }
+
+ override def equals(other: Any): Boolean = other match {
+ case p: S2Property[_] =>
+ labelMeta.labelId == p.labelMeta.labelId &&
+ labelMeta.seq == p.labelMeta.seq &&
+ key == p.key && value == p.value && ts == p.ts
+ case _ => false
+ }
+
+ override def toString(): String = {
+ Map("labelMeta" -> labelMeta.toString, "key" -> key, "value" -> value, "ts" -> ts).toString
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
index bbd71ec..0ff4f98 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
@@ -19,15 +19,21 @@
package org.apache.s2graph.core
+import java.util
+
import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core.mysqls.{ColumnMeta, Service, ServiceColumn}
import org.apache.s2graph.core.types.{InnerVal, InnerValLike, SourceVertexId, VertexId}
+import org.apache.tinkerpop.gremlin.structure
+import org.apache.tinkerpop.gremlin.structure.VertexProperty.Cardinality
+import org.apache.tinkerpop.gremlin.structure.{Vertex => TpVertex, Direction, Edge, VertexProperty, Graph}
import play.api.libs.json.Json
+
case class Vertex(id: VertexId,
ts: Long = System.currentTimeMillis(),
props: Map[Int, InnerValLike] = Map.empty[Int, InnerValLike],
op: Byte = 0,
- belongLabelIds: Seq[Int] = Seq.empty) extends GraphElement {
+ belongLabelIds: Seq[Int] = Seq.empty) extends GraphElement with TpVertex {
val innerId = id.innerId
@@ -97,6 +103,22 @@ case class Vertex(id: VertexId,
else
Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, columnName).mkString("\t")
}
+
+ override def vertices(direction: Direction, strings: String*): util.Iterator[TpVertex] = ???
+
+ override def edges(direction: Direction, strings: String*): util.Iterator[structure.Edge] = ???
+
+ override def property[V](cardinality: Cardinality, s: String, v: V, objects: AnyRef*): VertexProperty[V] = ???
+
+ override def addEdge(s: String, vertex: TpVertex, objects: AnyRef*): Edge = ???
+
+ override def properties[V](strings: String*): util.Iterator[VertexProperty[V]] = ???
+
+ override def remove(): Unit = ???
+
+ override def graph(): Graph = ???
+
+ override def label(): String = ???
}
object Vertex {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index 8baf787..805a544 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -541,7 +541,7 @@ class RequestParser(graph: Graph) {
val elementsWithTsv = for {
edgeStr <- edgeStrs
str <- GraphUtil.parseString(edgeStr)
- element <- Graph.toGraphElement(str)
+ element <- graph.toGraphElement(str)
} yield (element, str)
elementsWithTsv
@@ -566,7 +566,7 @@ class RequestParser(graph: Graph) {
tgtId <- tgtIds.flatMap(jsValueToAny(_).toSeq)
} yield {
// val edge = Management.toEdge(graph, timestamp, operation, srcId, tgtId, label, direction, fromJsonToProperties(propsJson))
- val edge = Edge.toEdge(srcId, tgtId, label, direction, fromJsonToProperties(propsJson), ts = timestamp, operation = operation)
+ val edge = graph.toEdge(srcId, tgtId, label, direction, fromJsonToProperties(propsJson), ts = timestamp, operation = operation)
val tsv = (jsValue \ "direction").asOpt[String] match {
case None => Seq(timestamp, operation, "e", srcId, tgtId, label, propsJson.toString).mkString("\t")
case Some(dir) => Seq(timestamp, operation, "e", srcId, tgtId, label, propsJson.toString, dir).mkString("\t")
@@ -690,7 +690,7 @@ class RequestParser(graph: Graph) {
labelName <- (json \ "label").asOpt[String]
direction = (json \ "direction").asOpt[String].getOrElse("out")
} yield {
- Edge.toEdge(from, to, labelName, direction, Map.empty)
+ graph.toEdge(from, to, labelName, direction, Map.empty)
}
}
@@ -700,7 +700,7 @@ class RequestParser(graph: Graph) {
for {
edgeStr <- edgeStrs
str <- GraphUtil.parseString(edgeStr)
- element <- Graph.toGraphElement(str)
+ element <- graph.toGraphElement(str)
} yield element
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index b1ef11d..26d6ad1 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -113,20 +113,20 @@ abstract class Storage[Q, R](val graph: Graph,
* */
val snapshotEdgeDeserializers: Map[String, Deserializable[SnapshotEdge]] = Map(
- VERSION1 -> new SnapshotEdgeDeserializable,
- VERSION2 -> new SnapshotEdgeDeserializable,
- VERSION3 -> new serde.snapshotedge.tall.SnapshotEdgeDeserializable,
- VERSION4 -> new serde.snapshotedge.tall.SnapshotEdgeDeserializable
+ VERSION1 -> new SnapshotEdgeDeserializable(graph),
+ VERSION2 -> new SnapshotEdgeDeserializable(graph),
+ VERSION3 -> new serde.snapshotedge.tall.SnapshotEdgeDeserializable(graph),
+ VERSION4 -> new serde.snapshotedge.tall.SnapshotEdgeDeserializable(graph)
)
def snapshotEdgeDeserializer(schemaVer: String) =
snapshotEdgeDeserializers.get(schemaVer).getOrElse(throw new RuntimeException(s"not supported version: ${schemaVer}"))
/** create deserializer that can parse stored CanSKeyValue into indexEdge. */
- val indexEdgeDeserializers: Map[String, Deserializable[IndexEdge]] = Map(
- VERSION1 -> new IndexEdgeDeserializable,
- VERSION2 -> new IndexEdgeDeserializable,
- VERSION3 -> new IndexEdgeDeserializable,
- VERSION4 -> new serde.indexedge.tall.IndexEdgeDeserializable
+ val indexEdgeDeserializers: Map[String, Deserializable[Edge]] = Map(
+ VERSION1 -> new IndexEdgeDeserializable(graph),
+ VERSION2 -> new IndexEdgeDeserializable(graph),
+ VERSION3 -> new IndexEdgeDeserializable(graph),
+ VERSION4 -> new serde.indexedge.tall.IndexEdgeDeserializable(graph)
)
def indexEdgeDeserializer(schemaVer: String) =
@@ -795,17 +795,25 @@ abstract class Storage[Q, R](val graph: Graph,
} yield {
val edge = edgeWithScore.edge
val score = edgeWithScore.score
- /** reverted direction */
- val reversedIndexedEdgesMutations = edge.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
+
+ val edgeSnapshot = edge.copyEdge(propsWithTs = Edge.propsToState(edge.updatePropsWithTs()))
+ val reversedSnapshotEdgeMutations = snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
+
+ val edgeForward = edge.copyEdge(propsWithTs = Edge.propsToState(edge.updatePropsWithTs()))
+ val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge =>
indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
buildIncrementsAsync(indexEdge, -1L)
}
- val reversedSnapshotEdgeMutations = snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
- val forwardIndexedEdgeMutations = edge.edgesWithIndex.flatMap { indexEdge =>
+
+ /** reverted direction */
+ val edgeRevert = edge.copyEdge(propsWithTs = Edge.propsToState(edge.updatePropsWithTs()))
+ val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
buildIncrementsAsync(indexEdge, -1L)
}
+
val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
+
writeToStorage(zkQuorum, mutations, withWait = true)
}
@@ -821,7 +829,7 @@ abstract class Storage[Q, R](val graph: Graph,
/** Parsing Logic: parse from kv from Storage into Edge */
def toEdge[K: CanSKeyValue](kv: K,
queryRequest: QueryRequest,
- cacheElementOpt: Option[IndexEdge],
+ cacheElementOpt: Option[Edge],
parentEdges: Seq[EdgeWithScore]): Option[Edge] = {
logger.debug(s"toEdge: $kv")
@@ -830,8 +838,8 @@ abstract class Storage[Q, R](val graph: Graph,
val queryParam = queryRequest.queryParam
val schemaVer = queryParam.label.schemaVersion
val indexEdgeOpt = indexEdgeDeserializer(schemaVer).fromKeyValues(Option(queryParam.label), Seq(kv), queryParam.label.schemaVersion, cacheElementOpt)
- if (!queryOption.returnTree) indexEdgeOpt.map(indexEdge => indexEdge.toEdge.copy(parentEdges = parentEdges))
- else indexEdgeOpt.map(indexEdge => indexEdge.toEdge)
+ if (!queryOption.returnTree) indexEdgeOpt.map(indexEdge => indexEdge.copy(parentEdges = parentEdges))
+ else indexEdgeOpt
} catch {
case ex: Exception =>
logger.error(s"Fail on toEdge: ${kv.toString}, ${queryRequest}", ex)
@@ -898,7 +906,7 @@ abstract class Storage[Q, R](val graph: Graph,
val (degreeEdges, keyValues) = cacheElementOpt match {
case None => (Nil, kvs)
case Some(cacheElement) =>
- val head = cacheElement.toEdge
+ val head = cacheElement
if (!head.isDegree) (Nil, kvs)
else (Seq(EdgeWithScore(head, 1.0, label)), kvs.tail)
}
@@ -968,13 +976,13 @@ abstract class Storage[Q, R](val graph: Graph,
val (srcVId, tgtVId) = (SourceVertexId(srcColumn.id.get, src), TargetVertexId(tgtColumn.id.get, tgt))
val (srcV, tgtV) = (Vertex(srcVId), Vertex(tgtVId))
- Edge(srcV, tgtV, label, labelWithDir.dir, propsWithTs = propsWithTs)
+ graph.newEdge(srcV, tgtV, label, labelWithDir.dir, propsWithTs = propsWithTs)
case None =>
val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion)
val srcVId = SourceVertexId(srcColumn.id.get, src)
val srcV = Vertex(srcVId)
- Edge(srcV, srcV, label, labelWithDir.dir, propsWithTs = propsWithTs, parentEdges = parentEdges)
+ graph.newEdge(srcV, srcV, label, labelWithDir.dir, propsWithTs = propsWithTs, parentEdges = parentEdges)
}
}
@@ -1075,13 +1083,15 @@ abstract class Storage[Q, R](val graph: Graph,
/** IndexEdge */
def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = {
- val newProps = indexedEdge.updatePropsWithTs(Map(LabelMeta.degree -> InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer)))
+ val newProps = indexedEdge.updatePropsWithTs()
+ newProps.put(LabelMeta.degree.name, new S2Property(indexedEdge.toEdge, LabelMeta.degree, LabelMeta.degree.name, amount, indexedEdge.ts))
val _indexedEdge = indexedEdge.copy(propsWithTs = newProps)
indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment, durability = _indexedEdge.label.durability))
}
def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = {
- val newProps = indexedEdge.updatePropsWithTs(Map(LabelMeta.count -> InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer)))
+ val newProps = indexedEdge.updatePropsWithTs()
+ newProps.put(LabelMeta.degree.name, new S2Property(indexedEdge.toEdge, LabelMeta.degree, LabelMeta.degree.name, amount, indexedEdge.ts))
val _indexedEdge = indexedEdge.copy(propsWithTs = newProps)
indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment, durability = _indexedEdge.label.durability))
}
@@ -1109,10 +1119,8 @@ abstract class Storage[Q, R](val graph: Graph,
}
def buildDegreePuts(edge: Edge, degreeVal: Long): Seq[SKeyValue] = {
- val kvs = edge.edgesWithIndexValid.flatMap { _indexEdge =>
- val newProps = Map(LabelMeta.degree -> InnerValLikeWithTs.withLong(degreeVal, _indexEdge.ts, _indexEdge.schemaVer))
- val indexEdge = _indexEdge.copy(propsWithTs = newProps)
-
+ edge.property(LabelMeta.degree.name, degreeVal, edge.ts)
+ val kvs = edge.edgesWithIndexValid.flatMap { indexEdge =>
indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put, durability = indexEdge.label.durability))
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
index 2428173..c538e53 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -25,13 +25,14 @@ import org.apache.s2graph.core.storage.StorageDeserializable._
import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, StorageDeserializable}
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.utils.logger
-import org.apache.s2graph.core.{GraphUtil, IndexEdge, Vertex}
+import org.apache.s2graph.core._
import scala.collection.immutable
object IndexEdgeDeserializable{
- def getNewInstance() = new IndexEdgeDeserializable()
+ def getNewInstance(graph: Graph) = new IndexEdgeDeserializable(graph)
}
-class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] {
+class IndexEdgeDeserializable(graph: Graph,
+ bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[Edge] {
import StorageDeserializable._
type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int)
@@ -40,7 +41,7 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
_kvs: Seq[T],
schemaVer: String,
- cacheElementOpt: Option[IndexEdge]): IndexEdge = {
+ cacheElementOpt: Option[Edge]): Edge = {
assert(_kvs.size == 1)
@@ -59,19 +60,25 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
pos += 1
val label = checkLabel.getOrElse(Label.findById(labelWithDir.labelId))
-// val op = kv.row(pos)
-// pos += 1
+
+ val srcVertex = Vertex(srcVertexId, version)
+ //TODO:
+ val edge = graph.newEdge(srcVertex, null,
+ label, labelWithDir.dir, GraphUtil.defaultOpByte, version, Edge.EmptyState)
+ var tsVal = version
if (pos == kv.row.length) {
// degree
// val degreeVal = Bytes.toLong(kv.value)
val degreeVal = bytesToLongFunc(kv.value, 0)
- val ts = kv.timestamp
- val tsInnerValLikeWithTs = InnerValLikeWithTs.withLong(ts, ts, schemaVer)
- val props = Map(LabelMeta.timestamp -> tsInnerValLikeWithTs,
- LabelMeta.degree -> InnerValLikeWithTs.withLong(degreeVal, ts, schemaVer))
val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer))
- IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), label, labelWithDir.dir, GraphUtil.defaultOpByte, ts, labelIdxSeq, props, tsInnerValOpt = Option(tsInnerValLikeWithTs.innerVal))
+
+ edge.property(LabelMeta.timestamp.name, version, version)
+ edge.property(LabelMeta.degree.name, degreeVal, version)
+ edge.tgtVertex = Vertex(tgtVertexId, version)
+ edge.op = GraphUtil.defaultOpByte
+ edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
+ edge
} else {
// not degree edge
val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, schemaVer)
@@ -85,60 +92,47 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
}
val op = kv.row(kv.row.length-1)
- val allProps = immutable.Map.newBuilder[LabelMeta, InnerValLikeWithTs]
val index = label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}"))
-
/** process indexProps */
val size = idxPropsRaw.length
(0 until size).foreach { ith =>
val meta = index.sortKeyTypesArray(ith)
val (k, v) = idxPropsRaw(ith)
- if (k == LabelMeta.degree) allProps += LabelMeta.degree -> InnerValLikeWithTs(v, version)
- else allProps += meta -> InnerValLikeWithTs(v, version)
+ if (k == LabelMeta.timestamp) tsVal = v.value.asInstanceOf[BigDecimal].longValue()
+
+ if (k == LabelMeta.degree) {
+ edge.property(LabelMeta.degree.name, v.value, version)
+ } else {
+ edge.property(meta.name, v.value, version)
+ }
}
-// for {
-// (meta, (k, v)) <- index.sortKeyTypes.zip(idxPropsRaw)
-// } {
-// if (k == LabelMeta.degree) allProps += LabelMeta.degree -> InnerValLikeWithTs(v, version)
-// else {
-// allProps += meta -> InnerValLikeWithTs(v, version)
-// }
-// }
/** process props */
if (op == GraphUtil.operations("incrementCount")) {
// val countVal = Bytes.toLong(kv.value)
val countVal = bytesToLongFunc(kv.value, 0)
- allProps += (LabelMeta.count -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
+ edge.property(LabelMeta.count.name, countVal, version)
} else {
val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer, label)
props.foreach { case (k, v) =>
- allProps += (k -> InnerValLikeWithTs(v, version))
+ if (k == LabelMeta.timestamp) tsVal = v.value.asInstanceOf[BigDecimal].longValue()
+
+ edge.property(k.name, v.value, version)
}
}
- val _mergedProps = allProps.result()
- val (mergedProps, tsInnerValLikeWithTs) = _mergedProps.get(LabelMeta.timestamp) match {
- case None =>
- val tsInnerVal = InnerValLikeWithTs.withLong(version, version, schemaVer)
- val mergedProps = _mergedProps + (LabelMeta.timestamp -> InnerValLikeWithTs.withLong(version, version, schemaVer))
- (mergedProps, tsInnerVal)
- case Some(tsInnerVal) =>
- (_mergedProps, tsInnerVal)
- }
-// val mergedProps =
-// if (_mergedProps.contains(LabelMeta.timestamp)) _mergedProps
-// else _mergedProps + (LabelMeta.timestamp -> InnerValLikeWithTs.withLong(version, version, schemaVer))
/** process tgtVertexId */
val tgtVertexId =
- mergedProps.get(LabelMeta.to) match {
- case None => tgtVertexIdRaw
- case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
- }
-
+ if (edge.checkProperty(LabelMeta.to.name)) {
+ val vId = edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs
+ TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
+ } else tgtVertexIdRaw
- IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), label, labelWithDir.dir, op, version, labelIdxSeq, mergedProps, tsInnerValOpt = Option(tsInnerValLikeWithTs.innerVal))
+ edge.tgtVertex = Vertex(tgtVertexId, version)
+ edge.op = op
+ edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
+ edge
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
index 534667b..2b620a1 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -23,10 +23,11 @@ import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
import org.apache.s2graph.core.storage.StorageDeserializable._
import org.apache.s2graph.core.storage._
import org.apache.s2graph.core.types._
-import org.apache.s2graph.core.{GraphUtil, IndexEdge, Vertex}
+import org.apache.s2graph.core._
import scala.collection.immutable
-class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] {
+class IndexEdgeDeserializable(graph: Graph,
+ bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[Edge] {
import StorageDeserializable._
type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int)
@@ -67,7 +68,7 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
_kvs: Seq[T],
schemaVer: String,
- cacheElementOpt: Option[IndexEdge]): IndexEdge = {
+ cacheElementOpt: Option[Edge]): Edge = {
assert(_kvs.size == 1)
// val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
@@ -75,17 +76,22 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
val kv = implicitly[CanSKeyValue[T]].toSKeyValue(_kvs.head)
val version = kv.timestamp
- val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e =>
- (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0)
- }.getOrElse(parseRow(kv, schemaVer))
+// val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e =>
+// (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0)
+// }.getOrElse(parseRow(kv, schemaVer))
+ val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = parseRow(kv, schemaVer)
val label = checkLabel.getOrElse(Label.findById(labelWithDir.labelId))
+ val srcVertex = Vertex(srcVertexId, version)
+ //TODO:
+ val edge = graph.newEdge(srcVertex, null,
+ label, labelWithDir.dir, GraphUtil.defaultOpByte, version, Edge.EmptyState)
+ var tsVal = version
val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) =
if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, schemaVer)
else parseQualifier(kv, schemaVer)
- val allProps = immutable.Map.newBuilder[LabelMeta, InnerValLikeWithTs]
val index = label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}"))
/** process indexProps */
@@ -93,52 +99,38 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
(0 until size).foreach { ith =>
val meta = index.sortKeyTypesArray(ith)
val (k, v) = idxPropsRaw(ith)
- if (k == LabelMeta.degree) allProps += LabelMeta.degree -> InnerValLikeWithTs(v, version)
- else allProps += meta -> InnerValLikeWithTs(v, version)
+ if (k == LabelMeta.timestamp) tsVal = v.value.asInstanceOf[BigDecimal].longValue()
+
+ if (k == LabelMeta.degree) {
+ edge.property(LabelMeta.degree.name, v.value, version)
+ } else {
+ edge.property(meta.name, v.value, version)
+ }
}
-// for {
-// (seq, (k, v)) <- index.sortKeyTypes.zip(idxPropsRaw)
-// } {
-// if (k == LabelMeta.degree) allProps += LabelMeta.degree -> InnerValLikeWithTs(v, version)
-// else allProps += seq -> InnerValLikeWithTs(v, version)
-// }
/** process props */
if (op == GraphUtil.operations("incrementCount")) {
- // val countVal = Bytes.toLong(kv.value)
+ // val countVal = Bytes.toLong(kv.value)
val countVal = bytesToLongFunc(kv.value, 0)
- allProps += (LabelMeta.count -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
- } else if (kv.qualifier.isEmpty) {
- val countVal = bytesToLongFunc(kv.value, 0)
- allProps += (LabelMeta.degree -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
+ edge.property(LabelMeta.count.name, countVal, version)
} else {
- val (props, _) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer, label)
+ val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer, label)
props.foreach { case (k, v) =>
- allProps += (k -> InnerValLikeWithTs(v, version))
- }
- }
+ if (k == LabelMeta.timestamp) tsVal = v.value.asInstanceOf[BigDecimal].longValue()
- val _mergedProps = allProps.result()
- val (mergedProps, tsInnerValLikeWithTs) = _mergedProps.get(LabelMeta.timestamp) match {
- case None =>
- val tsInnerVal = InnerValLikeWithTs.withLong(version, version, schemaVer)
- val mergedProps = _mergedProps + (LabelMeta.timestamp -> InnerValLikeWithTs.withLong(version, version, schemaVer))
- (mergedProps, tsInnerVal)
- case Some(tsInnerVal) =>
- (_mergedProps, tsInnerVal)
+ edge.property(k.name, v.value, version)
+ }
}
-// val mergedProps =
-// if (_mergedProps.contains(LabelMeta.timestamp)) _mergedProps
-// else _mergedProps + (LabelMeta.timestamp -> InnerValLikeWithTs.withLong(version, version, schemaVer))
-
/** process tgtVertexId */
val tgtVertexId =
- mergedProps.get(LabelMeta.to) match {
- case None => tgtVertexIdRaw
- case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
- }
-
- IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), label, labelWithDir.dir, op, version, labelIdxSeq, mergedProps, tsInnerValOpt = Option(tsInnerValLikeWithTs.innerVal))
-
+ if (edge.checkProperty(LabelMeta.to.name)) {
+ val vId = edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs
+ TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
+ } else tgtVertexIdRaw
+
+ edge.tgtVertex = Vertex(tgtVertexId, version)
+ edge.op = op
+ edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
+ edge
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
index 91b8db1..37aafcf 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
@@ -24,9 +24,9 @@ import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
import org.apache.s2graph.core.storage.StorageDeserializable._
import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, StorageDeserializable}
import org.apache.s2graph.core.types.{HBaseType, LabelWithDirection, SourceAndTargetVertexIdPair, SourceVertexId}
-import org.apache.s2graph.core.{Edge, SnapshotEdge, Vertex}
+import org.apache.s2graph.core.{Graph, Edge, SnapshotEdge, Vertex}
-class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
+class SnapshotEdgeDeserializable(graph: Graph) extends Deserializable[SnapshotEdge] {
def statusCodeWithOp(byte: Byte): (Byte, Byte) = {
val statusCode = byte >> 4
@@ -87,7 +87,7 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
val pendingEdge =
- Edge(Vertex(srcVertexId, cellVersion),
+ graph.newEdge(Vertex(srcVertexId, cellVersion),
Vertex(tgtVertexId, cellVersion),
label, labelWithDir.dir, pendingEdgeOp,
cellVersion, pendingEdgeProps.toMap,
@@ -98,7 +98,7 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
(kvsMap, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal)
}
- SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts),
+ graph.newSnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts),
label, labelWithDir.dir, op, cellVersion, props, statusCode = statusCode,
pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal))
}