You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/12/01 08:30:15 UTC
[05/10] incubator-s2graph git commit: [S2GRAPH-131]: Add actual
implementation on interfaces from TinkerPop3 structure package. - Change
core.Edge/Vertex/Graph to core.S2Edge/S2Vertex/S2Graph. - Implement base
interfaces for tinkerpop3 structure packag
[S2GRAPH-131]: Add actual implementation on interfaces from TinkerPop3 structure package.
- Change core.Edge/Vertex/Graph to core.S2Edge/S2Vertex/S2Graph.
- Implement base interfaces for tinkerpop3 structure package.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/e8c0bf20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/e8c0bf20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/e8c0bf20
Branch: refs/heads/master
Commit: e8c0bf20b517a2e9a752df63c156fc265b7365b6
Parents: 189bc41
Author: DO YUNG YOON <st...@apache.org>
Authored: Mon Nov 28 12:09:52 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Mon Nov 28 12:09:52 2016 +0900
----------------------------------------------------------------------
.../loader/subscriber/GraphSubscriber.scala | 14 +-
.../loader/subscriber/TransferToHFile.scala | 6 +-
.../s2graph/loader/subscriber/WalLogStat.scala | 2 +-
.../loader/subscriber/WalLogToHDFS.scala | 2 +-
.../scala/org/apache/s2graph/core/Edge.scala | 956 ------------
.../scala/org/apache/s2graph/core/Graph.scala | 1238 ----------------
.../org/apache/s2graph/core/Management.scala | 2 +-
.../org/apache/s2graph/core/PostProcess.scala | 8 +-
.../org/apache/s2graph/core/QueryParam.scala | 23 +-
.../org/apache/s2graph/core/QueryResult.scala | 10 +-
.../scala/org/apache/s2graph/core/S2Edge.scala | 969 ++++++++++++
.../scala/org/apache/s2graph/core/S2Graph.scala | 1397 ++++++++++++++++++
.../org/apache/s2graph/core/S2Property.scala | 7 +-
.../org/apache/s2graph/core/S2Vertex.scala | 221 +++
.../apache/s2graph/core/S2VertexProperty.scala | 31 +-
.../scala/org/apache/s2graph/core/Vertex.scala | 132 --
.../apache/s2graph/core/mysqls/ColumnMeta.scala | 12 +-
.../apache/s2graph/core/mysqls/LabelMeta.scala | 1 -
.../s2graph/core/mysqls/ServiceColumn.scala | 15 +-
.../s2graph/core/parsers/WhereParser.scala | 34 +-
.../s2graph/core/rest/RequestParser.scala | 10 +-
.../apache/s2graph/core/rest/RestHandler.scala | 4 +-
.../apache/s2graph/core/storage/Storage.scala | 96 +-
.../core/storage/hbase/AsynchbaseStorage.scala | 10 +-
.../tall/IndexEdgeDeserializable.scala | 10 +-
.../wide/IndexEdgeDeserializable.scala | 8 +-
.../tall/SnapshotEdgeDeserializable.scala | 4 +-
.../wide/SnapshotEdgeDeserializable.scala | 4 +-
.../serde/vertex/VertexDeserializable.scala | 23 +-
.../serde/vertex/VertexSerializable.scala | 13 +-
.../s2graph/core/types/InnerValLike.scala | 17 +
.../apache/s2graph/core/types/VertexId.scala | 2 +-
.../org/apache/s2graph/core/EdgeTest.scala | 210 ---
.../core/Integrate/IntegrateCommon.scala | 6 +-
.../Integrate/tinkerpop/S2S2GraphTest.scala | 130 ++
.../org/apache/s2graph/core/S2EdgeTest.scala | 210 +++
.../s2graph/core/TestCommonWithModels.scala | 4 +-
.../core/benchmark/BenchmarkCommon.scala | 2 +-
.../s2graph/core/parsers/WhereParserTest.scala | 2 +-
.../s2graph/counter/helper/CounterAdmin.scala | 4 +-
.../counter/core/RankingCounterSpec.scala | 4 +-
.../loader/core/CounterEtlFunctions.scala | 14 +-
.../loader/core/CounterEtlFunctionsSpec.scala | 4 +-
.../org/apache/s2graph/rest/netty/Server.scala | 4 +-
.../apache/s2graph/rest/play/Bootstrap.scala | 6 +-
.../s2graph/rest/play/actors/QueueActor.scala | 8 +-
.../rest/play/controllers/EdgeController.scala | 12 +-
.../play/controllers/VertexController.scala | 4 +-
48 files changed, 3179 insertions(+), 2726 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
index b25bc84..2352cdf 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
@@ -48,7 +48,7 @@ object GraphConfig {
if (kafkaBrokerList.isEmpty) Map("hbase.zookeeper.quorum" -> zkQuorum, "db.default.url" -> database, "cache.ttl.seconds" -> cacheTTL)
else Map("hbase.zookeeper.quorum" -> zkQuorum, "db.default.url" -> database, "kafka.metadata.broker.list" -> kafkaBrokers, "cache.ttl.seconds" -> cacheTTL)
- ConfigFactory.parseMap(newConf).withFallback(Graph.DefaultConfig)
+ ConfigFactory.parseMap(newConf).withFallback(S2Graph.DefaultConfig)
}
}
@@ -64,7 +64,7 @@ object GraphSubscriberHelper extends WithKafka {
private val sleepPeriod = 10000
private val maxTryNum = 10
- var g: Graph = null
+ var g: S2Graph = null
var management: Management = null
val conns = new scala.collection.mutable.HashMap[String, Connection]()
@@ -80,7 +80,7 @@ object GraphSubscriberHelper extends WithKafka {
if (g == null) {
val ec = ExecutionContext.Implicits.global
- g = new Graph(config)(ec)
+ g = new S2Graph(config)(ec)
management = new Management(g)
}
}
@@ -107,12 +107,12 @@ object GraphSubscriberHelper extends WithKafka {
(for (msg <- msgs) yield {
statFunc("total", 1)
g.toGraphElement(msg, labelMapping) match {
- case Some(e) if e.isInstanceOf[Edge] =>
+ case Some(e) if e.isInstanceOf[S2Edge] =>
statFunc("EdgeParseOk", 1)
- e.asInstanceOf[Edge]
- case Some(v) if v.isInstanceOf[Vertex] =>
+ e.asInstanceOf[S2Edge]
+ case Some(v) if v.isInstanceOf[S2Vertex] =>
statFunc("VertexParseOk", 1)
- v.asInstanceOf[Vertex]
+ v.asInstanceOf[S2Vertex]
case Some(x) =>
throw new RuntimeException(s">>>>> GraphSubscriber.toGraphElements: parsing failed. ${x.serviceName}")
case None =>
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
index d1da319..dce085e 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
@@ -57,7 +57,7 @@ object TransferToHFile extends SparkApp {
/** build key values */
case class DegreeKey(vertexIdStr: String, labelName: String, direction: String)
- private def insertBulkForLoaderAsync(edge: Edge, createRelEdges: Boolean = true): List[PutRequest] = {
+ private def insertBulkForLoaderAsync(edge: S2Edge, createRelEdges: Boolean = true): List[PutRequest] = {
val relEdges = if (createRelEdges) edge.relatedEdges else List(edge)
buildPutRequests(edge.toSnapshotEdge) ++ relEdges.toList.flatMap { e =>
e.edgesWithIndex.flatMap { indexEdge => buildPutRequests(indexEdge) }
@@ -125,8 +125,8 @@ object TransferToHFile extends SparkApp {
def toKeyValues(strs: Seq[String], labelMapping: Map[String, String], autoEdgeCreate: Boolean): Iterator[KeyValue] = {
val kvs = for {
s <- strs
- element <- GraphSubscriberHelper.g.toGraphElement(s, labelMapping).toSeq if element.isInstanceOf[Edge]
- edge = element.asInstanceOf[Edge]
+ element <- GraphSubscriberHelper.g.toGraphElement(s, labelMapping).toSeq if element.isInstanceOf[S2Edge]
+ edge = element.asInstanceOf[S2Edge]
putRequest <- insertBulkForLoaderAsync(edge, autoEdgeCreate)
} yield {
val p = putRequest
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
index 5b68754..40f936d 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
@@ -21,7 +21,7 @@ package org.apache.s2graph.loader.subscriber
import kafka.producer.KeyedMessage
import kafka.serializer.StringDecoder
-import org.apache.s2graph.core.Graph
+import org.apache.s2graph.core.S2Graph$
import org.apache.s2graph.spark.spark.{WithKafka, SparkApp}
import org.apache.spark.streaming.Durations._
import org.apache.spark.streaming.kafka.HasOffsetRanges
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
index 0f69dc7..23c3cda 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
@@ -22,7 +22,7 @@ package org.apache.s2graph.loader.subscriber
import java.text.SimpleDateFormat
import java.util.Date
import kafka.serializer.StringDecoder
-import org.apache.s2graph.core.Graph
+import org.apache.s2graph.core.S2Graph$
import org.apache.s2graph.spark.spark.{WithKafka, SparkApp, HashMapParam}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.streaming.Durations._
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
deleted file mode 100644
index f10b4db..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
+++ /dev/null
@@ -1,956 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core
-
-import java.util
-import java.util.function.BiConsumer
-
-import org.apache.s2graph.core.Edge.{Props, State}
-import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
-import org.apache.s2graph.core.JSONParser._
-import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
-import org.apache.s2graph.core.types._
-import org.apache.s2graph.core.utils.logger
-import org.apache.tinkerpop.gremlin.structure
-import org.apache.tinkerpop.gremlin.structure.{Direction, Edge => TpEdge, Graph => TpGraph, Property}
-import play.api.libs.json.{JsNumber, JsObject, Json}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.{Map => MutableMap}
-import scala.util.hashing.MurmurHash3
-
-case class SnapshotEdge(graph: Graph,
- srcVertex: Vertex,
- tgtVertex: Vertex,
- label: Label,
- dir: Int,
- op: Byte,
- version: Long,
- private val propsWithTs: Props,
- pendingEdgeOpt: Option[Edge],
- statusCode: Byte = 0,
- lockTs: Option[Long],
- tsInnerValOpt: Option[InnerValLike] = None) {
- lazy val direction = GraphUtil.fromDirection(dir)
- lazy val operation = GraphUtil.fromOp(op)
- lazy val edge = toEdge
- lazy val labelWithDir = LabelWithDirection(label.id.get, dir)
-// if (!propsWithTs.contains(LabelMeta.timestamp.name)) throw new Exception("Timestamp is required.")
-
-// val label = Label.findById(labelWithDir.labelId)
- lazy val schemaVer = label.schemaVersion
- lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.toString().toLong
-
- def propsToKeyValuesWithTs = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq)
-
- def allPropsDeleted = Edge.allPropsDeleted(propsWithTs)
-
- def toEdge: Edge = {
- Edge(graph, srcVertex, tgtVertex, label, dir, op,
- version, propsWithTs, pendingEdgeOpt = pendingEdgeOpt,
- statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt)
- }
-
- def propsWithName = (for {
- (_, v) <- propsWithTs.asScala
- meta = v.labelMeta
- jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
- } yield meta.name -> jsValue) ++ Map("version" -> JsNumber(version))
-
- // only for debug
- def toLogString() = {
- List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, propsWithName).mkString("\t")
- }
-
- def property[V](key: String, value: V, ts: Long): S2Property[V] = {
- val labelMeta = label.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on IndexEdge."))
- val newProps = new S2Property(edge, labelMeta, key, value, ts)
- propsWithTs.put(key, newProps)
- newProps
- }
- override def hashCode(): Int = {
- MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId)
- }
-
- override def equals(other: Any): Boolean = other match {
- case e: SnapshotEdge =>
- srcVertex.innerId == e.srcVertex.innerId &&
- tgtVertex.innerId == e.tgtVertex.innerId &&
- labelWithDir == e.labelWithDir && op == e.op && version == e.version &&
- pendingEdgeOpt == e.pendingEdgeOpt && lockTs == lockTs && statusCode == statusCode
- case _ => false
- }
-
- override def toString(): String = {
- Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> label.label, "direction" -> direction,
- "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString,
- "statusCode" -> statusCode, "lockTs" -> lockTs).toString
- }
-}
-
-case class IndexEdge(graph: Graph,
- srcVertex: Vertex,
- tgtVertex: Vertex,
- label: Label,
- dir: Int,
- op: Byte,
- version: Long,
- labelIndexSeq: Byte,
- private val propsWithTs: Props,
- tsInnerValOpt: Option[InnerValLike] = None) {
-// if (!props.contains(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.")
- // assert(props.contains(LabelMeta.timeStampSeq))
- lazy val direction = GraphUtil.fromDirection(dir)
- lazy val operation = GraphUtil.fromOp(op)
- lazy val edge = toEdge
- lazy val labelWithDir = LabelWithDirection(label.id.get, dir)
-
- lazy val isInEdge = labelWithDir.dir == GraphUtil.directions("in")
- lazy val isOutEdge = !isInEdge
-
- lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.toString.toLong
- lazy val degreeEdge = propsWithTs.containsKey(LabelMeta.degree.name)
-
- lazy val schemaVer = label.schemaVersion
- lazy val labelIndex = LabelIndex.findByLabelIdAndSeq(labelWithDir.labelId, labelIndexSeq).get
- lazy val defaultIndexMetas = labelIndex.sortKeyTypes.map { meta =>
- val innerVal = toInnerVal(meta.defaultValue, meta.dataType, schemaVer)
- meta.seq -> innerVal
- }.toMap
-
- lazy val labelIndexMetaSeqs = labelIndex.sortKeyTypes
-
- /** TODO: make sure call of this class fill props as this assumes */
- lazy val orders = for (meta <- labelIndexMetaSeqs) yield {
- propsWithTs.get(meta.name) match {
- case null =>
-
- /**
- * TODO: agly hack
- * now we double store target vertex.innerId/srcVertex.innerId for easy development. later fix this to only store id once
- */
- val v = meta match {
- case LabelMeta.timestamp=> InnerVal.withLong(version, schemaVer)
- case LabelMeta.to => toEdge.tgtVertex.innerId
- case LabelMeta.from => toEdge.srcVertex.innerId
- // for now, it does not make sense to build index on srcVertex.innerId since all edges have same data.
- // throw new RuntimeException("_from on indexProps is not supported")
- case _ => toInnerVal(meta.defaultValue, meta.dataType, schemaVer)
- }
-
- meta -> v
- case v => meta -> v.innerVal
- }
- }
-
- lazy val ordersKeyMap = orders.map { case (meta, _) => meta.name }.toSet
- lazy val metas = for ((meta, v) <- propsWithTs.asScala if !ordersKeyMap.contains(meta)) yield v.labelMeta -> v.innerVal
-
-// lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) }
-
- //TODO:
- // lazy val kvs = Graph.client.indexedEdgeSerializer(this).toKeyValues.toList
-
- lazy val hasAllPropsForIndex = orders.length == labelIndexMetaSeqs.length
-
- def propsWithName = for {
- (_, v) <- propsWithTs.asScala
- meta = v.labelMeta
- jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
- } yield meta.name -> jsValue
-
-
- def toEdge: Edge = Edge(graph, srcVertex, tgtVertex, label, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
-
- // only for debug
- def toLogString() = {
- List(version, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, Json.toJson(propsWithName)).mkString("\t")
- }
-
- def property(key: String): Option[InnerValLikeWithTs] = {
- label.metaPropsInvMap.get(key).map(labelMeta => property(labelMeta))
- }
-
- def property(labelMeta: LabelMeta): InnerValLikeWithTs = {
-// propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse(label.metaPropsDefaultMapInner(labelMeta))
- if (propsWithTs.containsKey(labelMeta.name)) {
- propsWithTs.get(labelMeta.name).innerValWithTs
- } else {
- label.metaPropsDefaultMapInner(labelMeta)
- }
- }
-
- def updatePropsWithTs(others: Props = Edge.EmptyProps): Props = {
- if (others.isEmpty) propsWithTs
- else {
- val iter = others.entrySet().iterator()
- while (iter.hasNext) {
- val e = iter.next()
- propsWithTs.put(e.getKey, e.getValue)
- }
- propsWithTs
- }
- }
-
- def property[V](key: String, value: V, ts: Long): S2Property[V] = {
- val labelMeta = label.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on IndexEdge."))
- val newProps = new S2Property(edge, labelMeta, key, value, ts)
- propsWithTs.put(key, newProps)
- newProps
- }
- override def hashCode(): Int = {
- MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId + "," + labelIndexSeq)
- }
-
- override def equals(other: Any): Boolean = other match {
- case e: IndexEdge =>
- srcVertex.innerId == e.srcVertex.innerId &&
- tgtVertex.innerId == e.tgtVertex.innerId &&
- labelWithDir == e.labelWithDir && op == e.op && version == e.version &&
- labelIndexSeq == e.labelIndexSeq
- case _ => false
- }
-
- override def toString(): String = {
- Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> label.label, "direction" -> dir,
- "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString
- ).toString
- }
-}
-
-case class Edge(innerGraph: Graph,
- srcVertex: Vertex,
- var tgtVertex: Vertex,
- innerLabel: Label,
- dir: Int,
- var op: Byte = GraphUtil.defaultOpByte,
- var version: Long = System.currentTimeMillis(),
- propsWithTs: Props = Edge.EmptyProps,
- parentEdges: Seq[EdgeWithScore] = Nil,
- originalEdgeOpt: Option[Edge] = None,
- pendingEdgeOpt: Option[Edge] = None,
- statusCode: Byte = 0,
- lockTs: Option[Long] = None,
- var tsInnerValOpt: Option[InnerValLike] = None) extends GraphElement with TpEdge {
-
- lazy val labelWithDir = LabelWithDirection(innerLabel.id.get, dir)
- lazy val schemaVer = innerLabel.schemaVersion
- lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.value match {
- case b: BigDecimal => b.longValue()
- case l: Long => l
- case i: Int => i.toLong
- case _ => throw new RuntimeException("ts should be in [BigDecimal/Long/Int].")
- }
-
- lazy val operation = GraphUtil.fromOp(op)
- lazy val tsInnerVal = tsInnerValOpt.get.value
- lazy val srcId = srcVertex.innerIdVal
- lazy val tgtId = tgtVertex.innerIdVal
- lazy val labelName = innerLabel.label
- lazy val direction = GraphUtil.fromDirection(dir)
-
- def toIndexEdge(labelIndexSeq: Byte): IndexEdge = IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelIndexSeq, propsWithTs)
-
- def serializePropsWithTs(): Array[Byte] = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq)
-
- def updatePropsWithTs(others: Props = Edge.EmptyProps): Props = {
- val emptyProp = Edge.EmptyProps
-
- propsWithTs.forEach(new BiConsumer[String, S2Property[_]] {
- override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value)
- })
-
- others.forEach(new BiConsumer[String, S2Property[_]] {
- override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value)
- })
-
- emptyProp
- }
-
- def propertyValue(key: String): Option[InnerValLikeWithTs] = {
- key match {
- case "from" | "_from" => Option(InnerValLikeWithTs(srcVertex.innerId, ts))
- case "to" | "_to" => Option(InnerValLikeWithTs(tgtVertex.innerId, ts))
- case "label" => Option(InnerValLikeWithTs(InnerVal.withStr(innerLabel.label, schemaVer), ts))
- case "direction" => Option(InnerValLikeWithTs(InnerVal.withStr(direction, schemaVer), ts))
- case _ =>
- innerLabel.metaPropsInvMap.get(key).map(labelMeta => propertyValueInner(labelMeta))
- }
- }
-
- def propertyValueInner(labelMeta: LabelMeta): InnerValLikeWithTs= {
- // propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse()
- if (propsWithTs.containsKey(labelMeta.name)) {
- propsWithTs.get(labelMeta.name).innerValWithTs
- } else {
- innerLabel.metaPropsDefaultMapInner(labelMeta)
- }
- }
-
- def propertyValues(keys: Seq[String] = Nil): Map[LabelMeta, InnerValLikeWithTs] = {
- val labelMetas = for {
- key <- keys
- labelMeta <- innerLabel.metaPropsInvMap.get(key)
- } yield labelMeta
-
- propertyValuesInner(labelMetas)
- }
-
- def propertyValuesInner(labelMetas: Seq[LabelMeta] = Nil): Map[LabelMeta, InnerValLikeWithTs] = {
- if (labelMetas.isEmpty) {
- innerLabel.metaPropsDefaultMapInner.map { case (labelMeta, defaultVal) =>
- labelMeta -> propertyValueInner(labelMeta)
- }
- } else {
- // This is important since timestamp is required for all edges.
- (LabelMeta.timestamp +: labelMetas).map { labelMeta =>
- labelMeta -> propertyValueInner(labelMeta)
- }.toMap
- }
- }
-
-// if (!props.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.")
- // assert(propsWithTs.contains(LabelMeta.timeStampSeq))
-
- lazy val properties = toProps()
-
- def props = propsWithTs.asScala.mapValues(_.innerVal)
-
-
- private def toProps(): Map[String, Any] = {
- for {
- (labelMeta, defaultVal) <- innerLabel.metaPropsDefaultMapInner
- } yield {
- // labelMeta.name -> propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse(defaultVal).innerVal.value
- val value =
- if (propsWithTs.containsKey(labelMeta.name)) {
- propsWithTs.get(labelMeta.name).value
- } else {
- defaultVal.innerVal.value
- }
- labelMeta.name -> value
- }
- }
-
- def relatedEdges = {
- if (labelWithDir.isDirected) {
- val skipReverse = innerLabel.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false)
- if (skipReverse) List(this) else List(this, duplicateEdge)
- } else {
-// val outDir = labelWithDir.copy(dir = GraphUtil.directions("out"))
-// val base = copy(labelWithDir = outDir)
- val base = copy(dir = GraphUtil.directions("out"))
- List(base, base.reverseSrcTgtEdge)
- }
- }
-
- // def relatedEdges = List(this)
-
- def srcForVertex = {
- val belongLabelIds = Seq(labelWithDir.labelId)
- if (labelWithDir.dir == GraphUtil.directions("in")) {
- innerGraph.newVertex(VertexId(innerLabel.tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
- } else {
- innerGraph.newVertex(VertexId(innerLabel.srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
- }
- }
-
- def tgtForVertex = {
- val belongLabelIds = Seq(labelWithDir.labelId)
- if (labelWithDir.dir == GraphUtil.directions("in")) {
- innerGraph.newVertex(VertexId(innerLabel.srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
- } else {
- innerGraph.newVertex(VertexId(innerLabel.tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
- }
- }
-
- def duplicateEdge = reverseSrcTgtEdge.reverseDirEdge
-
-// def reverseDirEdge = copy(labelWithDir = labelWithDir.dirToggled)
- def reverseDirEdge = copy(dir = GraphUtil.toggleDir(dir))
-
- def reverseSrcTgtEdge = copy(srcVertex = tgtVertex, tgtVertex = srcVertex)
-
- def labelOrders = LabelIndex.findByLabelIdAll(labelWithDir.labelId)
-
- override def serviceName = innerLabel.serviceName
-
- override def queueKey = Seq(ts.toString, tgtVertex.serviceName).mkString("|")
-
- override def queuePartitionKey = Seq(srcVertex.innerId, tgtVertex.innerId).mkString("|")
-
- override def isAsync = innerLabel.isAsync
-
- def isDegree = propsWithTs.containsKey(LabelMeta.degree.name)
-
-// def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match {
-// case Some(_) => props
-// case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer))
-// }
-
- def propsPlusTsValid = propsWithTs.asScala.filter(kv => LabelMeta.isValidSeq(kv._2.labelMeta.seq)).asJava
-
- def edgesWithIndex = for (labelOrder <- labelOrders) yield {
- IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsWithTs, tsInnerValOpt = tsInnerValOpt)
- }
-
- def edgesWithIndexValid = for (labelOrder <- labelOrders) yield {
- IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsPlusTsValid, tsInnerValOpt = tsInnerValOpt)
- }
-
- /** force direction as out on invertedEdge */
- def toSnapshotEdge: SnapshotEdge = {
- val (smaller, larger) = (srcForVertex, tgtForVertex)
-
-// val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, GraphUtil.directions("out"))
-
- property(LabelMeta.timestamp.name, ts, ts)
- val ret = SnapshotEdge(innerGraph, smaller, larger, innerLabel,
- GraphUtil.directions("out"), op, version, propsWithTs,
- pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt)
- ret
- }
-
- def defaultPropsWithName = Json.obj("from" -> srcVertex.innerId.toString(), "to" -> tgtVertex.innerId.toString(),
- "label" -> innerLabel.label, "service" -> innerLabel.serviceName)
-
- def propsWithName =
- for {
- (_, v) <- propsWithTs.asScala
- meta = v.labelMeta
- jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
- } yield meta.name -> jsValue
-
-
- def updateTgtVertex(id: InnerValLike) = {
- val newId = TargetVertexId(tgtVertex.id.column, id)
- val newTgtVertex = innerGraph.newVertex(newId, tgtVertex.ts, tgtVertex.props)
- Edge(innerGraph, srcVertex, newTgtVertex, innerLabel, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
- }
-
- def rank(r: RankParam): Double =
- if (r.keySeqAndWeights.size <= 0) 1.0f
- else {
- var sum: Double = 0
-
- for ((labelMeta, w) <- r.keySeqAndWeights) {
- if (propsWithTs.containsKey(labelMeta.name)) {
- val innerValWithTs = propsWithTs.get(labelMeta.name)
- val cost = try innerValWithTs.innerVal.toString.toDouble catch {
- case e: Exception =>
- logger.error("toInnerval failed in rank", e)
- 1.0
- }
- sum += w * cost
- }
- }
- sum
- }
-
- def toLogString: String = {
- val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj())
- List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, allPropsWithName).mkString("\t")
- }
-
- override def hashCode(): Int = {
- MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId)
- }
-
- override def equals(other: Any): Boolean = other match {
- case e: Edge =>
- srcVertex.innerId == e.srcVertex.innerId &&
- tgtVertex.innerId == e.tgtVertex.innerId &&
- labelWithDir == e.labelWithDir && op == e.op && version == e.version &&
- pendingEdgeOpt == e.pendingEdgeOpt && lockTs == lockTs && statusCode == statusCode &&
- parentEdges == e.parentEdges && originalEdgeOpt == originalEdgeOpt
- case _ => false
- }
-
- override def toString(): String = {
- Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> labelName, "direction" -> direction,
- "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString,
- "parentEdges" -> parentEdges, "originalEdge" -> originalEdgeOpt, "statusCode" -> statusCode, "lockTs" -> lockTs
- ).toString
- }
-
- def checkProperty(key: String): Boolean = propsWithTs.containsKey(key)
-
- def copyEdge(srcVertex: Vertex = srcVertex,
- tgtVertex: Vertex = tgtVertex,
- innerLabel: Label = innerLabel,
- dir: Int = dir,
- op: Byte = op,
- version: Long = version,
- propsWithTs: State = Edge.propsToState(this.propsWithTs),
- parentEdges: Seq[EdgeWithScore] = parentEdges,
- originalEdgeOpt: Option[Edge] = originalEdgeOpt,
- pendingEdgeOpt: Option[Edge] = pendingEdgeOpt,
- statusCode: Byte = statusCode,
- lockTs: Option[Long] = lockTs,
- tsInnerValOpt: Option[InnerValLike] = tsInnerValOpt,
- ts: Long = ts): Edge = {
- val edge = new Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, Edge.EmptyProps,
- parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
- Edge.fillPropsWithTs(edge, propsWithTs)
- edge.property(LabelMeta.timestamp.name, ts, ts)
- edge
- }
-
- def copyEdgeWithState(state: State, ts: Long): Edge = {
- val newEdge = copy(propsWithTs = Edge.EmptyProps)
- Edge.fillPropsWithTs(newEdge, state)
- newEdge.property(LabelMeta.timestamp.name, ts, ts)
- newEdge
- }
-
- def copyEdgeWithState(state: State): Edge = {
- val newEdge = copy(propsWithTs = Edge.EmptyProps)
- Edge.fillPropsWithTs(newEdge, state)
- newEdge
- }
-
- override def vertices(direction: Direction): util.Iterator[structure.Vertex] = ???
-
- override def properties[V](strings: String*): util.Iterator[Property[V]] = ???
-
- override def property[V](key: String): Property[V] = {
- val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge."))
- if (propsWithTs.containsKey(key)) propsWithTs.get(key).asInstanceOf[Property[V]]
- else {
- val default = innerLabel.metaPropsDefaultMapInner(labelMeta)
- property(key, default.innerVal.value, default.ts).asInstanceOf[Property[V]]
- }
- }
-
- override def property[V](key: String, value: V): Property[V] = {
- property(key, value, System.currentTimeMillis())
- }
-
- def property[V](key: String, value: V, ts: Long): Property[V] = {
- val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge."))
- val newProp = new S2Property[V](this, labelMeta, key, value, ts)
- propsWithTs.put(key, newProp)
- newProp
- }
-
- override def remove(): Unit = {}
-
- override def graph(): TpGraph = innerGraph
-
- override def id(): AnyRef = (srcVertex.innerId, labelWithDir, tgtVertex.innerId)
-
- override def label(): String = innerLabel.label
-}
-
-
-case class EdgeMutate(edgesToDelete: List[IndexEdge] = List.empty[IndexEdge],
- edgesToInsert: List[IndexEdge] = List.empty[IndexEdge],
- newSnapshotEdge: Option[SnapshotEdge] = None) {
-
- def toLogString: String = {
- val l = (0 until 50).map(_ => "-").mkString("")
- val deletes = s"deletes: ${edgesToDelete.map(e => e.toLogString).mkString("\n")}"
- val inserts = s"inserts: ${edgesToInsert.map(e => e.toLogString).mkString("\n")}"
- val updates = s"snapshot: ${newSnapshotEdge.map(e => e.toLogString).mkString("\n")}"
-
- List("\n", l, deletes, inserts, updates, l, "\n").mkString("\n")
- }
-}
-
-object Edge {
- val incrementVersion = 1L
- val minTsVal = 0L
-
- /** now version information is required also **/
- type Props = java.util.Map[String, S2Property[_]]
- type State = Map[LabelMeta, InnerValLikeWithTs]
- type PropsPairWithTs = (State, State, Long, String)
- type MergeState = PropsPairWithTs => (State, Boolean)
- type UpdateFunc = (Option[Edge], Edge, MergeState)
-
- def EmptyProps = new java.util.HashMap[String, S2Property[_]]
- def EmptyState = Map.empty[LabelMeta, InnerValLikeWithTs]
- def sameProps(base: Props, other: Props): Boolean = {
- if (base.size != other.size) false
- else {
- var ret = true
- val iter = base.entrySet().iterator()
- while (iter.hasNext) {
- val e = iter.next()
- if (!other.containsKey(e.getKey)) ret = false
- else if (e.getValue != other.get(e.getKey)) ret = false
- else {
-
- }
- }
- val otherIter = other.entrySet().iterator()
- while (otherIter.hasNext) {
- val e = otherIter.next()
- if (!base.containsKey(e.getKey)) ret = false
- else if (e.getValue != base.get(e.getKey)) ret = false
- else {
-
- }
- }
- ret
- }
-// base.sameElements(other)
- }
- def fillPropsWithTs(snapshotEdge: SnapshotEdge, state: State): Unit = {
- state.foreach { case (k, v) => snapshotEdge.property(k.name, v.innerVal.value, v.ts) }
- }
- def fillPropsWithTs(indexEdge: IndexEdge, state: State): Unit = {
- state.foreach { case (k, v) => indexEdge.property(k.name, v.innerVal.value, v.ts) }
- }
- def fillPropsWithTs(edge: Edge, state: State): Unit = {
- state.foreach { case (k, v) => edge.property(k.name, v.innerVal.value, v.ts) }
- }
-
- def propsToState(props: Props): State = {
- props.asScala.map { case (k, v) =>
- v.labelMeta -> v.innerValWithTs
- }.toMap
- }
-
- def stateToProps(edge: Edge, state: State): Props = {
- state.foreach { case (k, v) =>
- edge.property(k.name, v.innerVal.value, v.ts)
- }
- edge.propsWithTs
- }
-
- def allPropsDeleted(props: Map[LabelMeta, InnerValLikeWithTs]): Boolean =
- if (!props.contains(LabelMeta.lastDeletedAt)) false
- else {
- val lastDeletedAt = props.get(LabelMeta.lastDeletedAt).get.ts
- val propsWithoutLastDeletedAt = props - LabelMeta.lastDeletedAt
-
- propsWithoutLastDeletedAt.forall { case (_, v) => v.ts <= lastDeletedAt }
- }
-
- def allPropsDeleted(props: Props): Boolean =
- if (!props.containsKey(LabelMeta.lastDeletedAt.name)) false
- else {
- val lastDeletedAt = props.get(LabelMeta.lastDeletedAt.name).ts
- props.remove(LabelMeta.lastDeletedAt.name)
-// val propsWithoutLastDeletedAt = props
-//
-// propsWithoutLastDeletedAt.forall { case (_, v) => v.ts <= lastDeletedAt }
- var ret = true
- val iter = props.entrySet().iterator()
- while (iter.hasNext) {
- val e = iter.next()
- if (e.getValue.ts > lastDeletedAt) ret = false
- }
- ret
- }
-
- def buildDeleteBulk(invertedEdge: Option[Edge], requestEdge: Edge): (Edge, EdgeMutate) = {
- // assert(invertedEdge.isEmpty)
- // assert(requestEdge.op == GraphUtil.operations("delete"))
-
- val edgesToDelete = requestEdge.relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
- val edgeInverted = Option(requestEdge.toSnapshotEdge)
-
- (requestEdge, EdgeMutate(edgesToDelete, edgesToInsert = Nil, newSnapshotEdge = edgeInverted))
- }
-
- def buildOperation(invertedEdge: Option[Edge], requestEdges: Seq[Edge]): (Edge, EdgeMutate) = {
- // logger.debug(s"oldEdge: ${invertedEdge.map(_.toStringRaw)}")
- // logger.debug(s"requestEdge: ${requestEdge.toStringRaw}")
- val oldPropsWithTs =
- if (invertedEdge.isEmpty) Map.empty[LabelMeta, InnerValLikeWithTs]
- else propsToState(invertedEdge.get.propsWithTs)
-
- val funcs = requestEdges.map { edge =>
- if (edge.op == GraphUtil.operations("insert")) {
- edge.innerLabel.consistencyLevel match {
- case "strong" => Edge.mergeUpsert _
- case _ => Edge.mergeInsertBulk _
- }
- } else if (edge.op == GraphUtil.operations("insertBulk")) {
- Edge.mergeInsertBulk _
- } else if (edge.op == GraphUtil.operations("delete")) {
- edge.innerLabel.consistencyLevel match {
- case "strong" => Edge.mergeDelete _
- case _ => throw new RuntimeException("not supported")
- }
- }
- else if (edge.op == GraphUtil.operations("update")) Edge.mergeUpdate _
- else if (edge.op == GraphUtil.operations("increment")) Edge.mergeIncrement _
- else throw new RuntimeException(s"not supported operation on edge: $edge")
- }
-
- val oldTs = invertedEdge.map(_.ts).getOrElse(minTsVal)
- val requestWithFuncs = requestEdges.zip(funcs).filter(oldTs != _._1.ts).sortBy(_._1.ts)
-
- if (requestWithFuncs.isEmpty) {
- (requestEdges.head, EdgeMutate())
- } else {
- val requestEdge = requestWithFuncs.last._1
- var prevPropsWithTs = oldPropsWithTs
-
- for {
- (requestEdge, func) <- requestWithFuncs
- } {
- val (_newPropsWithTs, _) = func(prevPropsWithTs, propsToState(requestEdge.propsWithTs), requestEdge.ts, requestEdge.schemaVer)
- prevPropsWithTs = _newPropsWithTs
- // logger.debug(s"${requestEdge.toLogString}\n$oldPropsWithTs\n$prevPropsWithTs\n")
- }
- val requestTs = requestEdge.ts
- /** version should be monotoniously increasing so our RPC mutation should be applied safely */
- val newVersion = invertedEdge.map(e => e.version + incrementVersion).getOrElse(requestTs)
- val maxTs = prevPropsWithTs.map(_._2.ts).max
- val newTs = if (maxTs > requestTs) maxTs else requestTs
- val propsWithTs = prevPropsWithTs ++
- Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(newTs, requestEdge.innerLabel.schemaVersion), newTs))
-
- val edgeMutate = buildMutation(invertedEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs)
-
- // logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}")
- // logger.error(s"$propsWithTs")
- val newEdge = requestEdge.copy(propsWithTs = EmptyProps)
- fillPropsWithTs(newEdge, propsWithTs)
- (newEdge, edgeMutate)
- }
- }
-
- def filterOutWithLabelOption(ls: Seq[IndexEdge]): Seq[IndexEdge] = ls.filter { ie =>
- ie.labelIndex.dir match {
- case None =>
- // both direction use same indices that is defined when label creation.
- true
- case Some(dir) =>
- if (dir != ie.dir) {
- // current labelIndex's direction is different with indexEdge's direction so don't touch
- false
- } else {
- ie.labelIndex.writeOption.map { option =>
- val hashValueOpt = ie.orders.find { case (k, v) => k == LabelMeta.fromHash }.map{ case (k, v) => v.value.toString.toLong }
- option.sample(ie, hashValueOpt)
- }.getOrElse(true)
- }
- }
- }
-
- def buildMutation(snapshotEdgeOpt: Option[Edge],
- requestEdge: Edge,
- newVersion: Long,
- oldPropsWithTs: Map[LabelMeta, InnerValLikeWithTs],
- newPropsWithTs: Map[LabelMeta, InnerValLikeWithTs]): EdgeMutate = {
-
- if (oldPropsWithTs == newPropsWithTs) {
- // all requests should be dropped. so empty mutation.
- EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = None)
- } else {
- val withOutDeletedAt = newPropsWithTs.filter(kv => kv._1 != LabelMeta.lastDeletedAtSeq)
- val newOp = snapshotEdgeOpt match {
- case None => requestEdge.op
- case Some(old) =>
- val oldMaxTs = old.propsWithTs.asScala.map(_._2.ts).max
- if (oldMaxTs > requestEdge.ts) old.op
- else requestEdge.op
- }
-
- val newSnapshotEdge = requestEdge.copy(op = newOp, version = newVersion).copyEdgeWithState(newPropsWithTs)
-
- val newSnapshotEdgeOpt = Option(newSnapshotEdge.toSnapshotEdge)
- // delete request must always update snapshot.
- if (withOutDeletedAt == oldPropsWithTs && newPropsWithTs.contains(LabelMeta.lastDeletedAt)) {
- // no mutation on indexEdges. only snapshotEdge should be updated to record lastDeletedAt.
- EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = newSnapshotEdgeOpt)
- } else {
- val edgesToDelete = snapshotEdgeOpt match {
- case Some(snapshotEdge) if snapshotEdge.op != GraphUtil.operations("delete") =>
- snapshotEdge.copy(op = GraphUtil.defaultOpByte)
- .relatedEdges.flatMap { relEdge => filterOutWithLabelOption(relEdge.edgesWithIndexValid) }
- case _ => Nil
- }
-
- val edgesToInsert =
- if (newPropsWithTs.isEmpty || allPropsDeleted(newPropsWithTs)) Nil
- else {
- val newEdge = requestEdge.copy(
- version = newVersion,
- propsWithTs = Edge.EmptyProps,
- op = GraphUtil.defaultOpByte
- )
- newPropsWithTs.foreach { case (k, v) => newEdge.property(k.name, v.innerVal.value, v.ts) }
-
- newEdge.relatedEdges.flatMap { relEdge => filterOutWithLabelOption(relEdge.edgesWithIndexValid) }
- }
-
-
- EdgeMutate(edgesToDelete = edgesToDelete,
- edgesToInsert = edgesToInsert,
- newSnapshotEdge = newSnapshotEdgeOpt)
- }
- }
- }
-
- def mergeUpsert(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
- var shouldReplace = false
- val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs
- val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal)
- val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
- propsWithTs.get(k) match {
- case Some(newValWithTs) =>
- assert(oldValWithTs.ts >= lastDeletedAt)
- val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs
- else {
- shouldReplace = true
- newValWithTs
- }
- Some(k -> v)
-
- case None =>
- assert(oldValWithTs.ts >= lastDeletedAt)
- if (oldValWithTs.ts >= requestTs || k.seq < 0) Some(k -> oldValWithTs)
- else {
- shouldReplace = true
- None
- }
- }
- }
- val existInNew =
- for {
- (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt
- } yield {
- shouldReplace = true
- Some(k -> newValWithTs)
- }
-
- ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace)
- }
-
- def mergeUpdate(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
- var shouldReplace = false
- val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs
- val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal)
- val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
- propsWithTs.get(k) match {
- case Some(newValWithTs) =>
- assert(oldValWithTs.ts >= lastDeletedAt)
- val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs
- else {
- shouldReplace = true
- newValWithTs
- }
- Some(k -> v)
- case None =>
- // important: update need to merge previous valid values.
- assert(oldValWithTs.ts >= lastDeletedAt)
- Some(k -> oldValWithTs)
- }
- }
- val existInNew = for {
- (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt
- } yield {
- shouldReplace = true
- Some(k -> newValWithTs)
- }
-
- ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace)
- }
-
- def mergeIncrement(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
- var shouldReplace = false
- val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs
- val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal)
- val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
- propsWithTs.get(k) match {
- case Some(newValWithTs) =>
- if (k == LabelMeta.timestamp) {
- val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs
- else {
- shouldReplace = true
- newValWithTs
- }
- Some(k -> v)
- } else {
- if (oldValWithTs.ts >= newValWithTs.ts) {
- Some(k -> oldValWithTs)
- } else {
- assert(oldValWithTs.ts < newValWithTs.ts && oldValWithTs.ts >= lastDeletedAt)
- shouldReplace = true
- // incr(t0), incr(t2), d(t1) => deleted
- Some(k -> InnerValLikeWithTs(oldValWithTs.innerVal + newValWithTs.innerVal, oldValWithTs.ts))
- }
- }
-
- case None =>
- assert(oldValWithTs.ts >= lastDeletedAt)
- Some(k -> oldValWithTs)
- // if (oldValWithTs.ts >= lastDeletedAt) Some(k -> oldValWithTs) else None
- }
- }
- val existInNew = for {
- (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt
- } yield {
- shouldReplace = true
- Some(k -> newValWithTs)
- }
-
- ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace)
- }
-
- def mergeDelete(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
- var shouldReplace = false
- val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs
- val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt) match {
- case Some(prevDeletedAt) =>
- if (prevDeletedAt.ts >= requestTs) prevDeletedAt.ts
- else {
- shouldReplace = true
- requestTs
- }
- case None => {
- shouldReplace = true
- requestTs
- }
- }
- val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
- if (k == LabelMeta.timestamp) {
- if (oldValWithTs.ts >= requestTs) Some(k -> oldValWithTs)
- else {
- shouldReplace = true
- Some(k -> InnerValLikeWithTs.withLong(requestTs, requestTs, version))
- }
- } else {
- if (oldValWithTs.ts >= lastDeletedAt) Some(k -> oldValWithTs)
- else {
- shouldReplace = true
- None
- }
- }
- }
- val mustExistInNew = Map(LabelMeta.lastDeletedAt -> InnerValLikeWithTs.withLong(lastDeletedAt, lastDeletedAt, version))
- ((existInOld.flatten ++ mustExistInNew).toMap, shouldReplace)
- }
-
- def mergeInsertBulk(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
- val (_, propsWithTs, _, _) = propsPairWithTs
- (propsWithTs, true)
- }
-
-// def fromString(s: String): Option[Edge] = Graph.toEdge(s)
-
-
-}