You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/11/16 16:33:39 UTC
[6/7] incubator-s2graph git commit: [S2GRAPH-122]: Change data types
of Edge/IndexEdge/SnapshotEdge.
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
index 2cbddea..ca32a14 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
@@ -19,25 +19,31 @@
package org.apache.s2graph.core
-import java.util
-import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.Executors
import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.hadoop.fs.Path
+import org.apache.s2graph.core.GraphExceptions.{FetchAllStepFailException, FetchTimeoutException, LabelNotExistException}
import org.apache.s2graph.core.JSONParser._
-import org.apache.s2graph.core.mysqls.{Label, Model}
-import org.apache.s2graph.core.parsers.WhereParser
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta, Model, Service}
import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
-import org.apache.s2graph.core.types.{InnerVal, LabelWithDirection}
-import org.apache.s2graph.core.utils.logger
+import org.apache.s2graph.core.storage.{SKeyValue, Storage}
+import org.apache.s2graph.core.types._
+import org.apache.s2graph.core.utils.{DeferCache, Extensions, SafeUpdateCache, logger}
import play.api.libs.json.{JsObject, Json}
+import scala.annotation.tailrec
import scala.collection.JavaConversions._
-import scala.collection._
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.concurrent._
-import scala.util.Try
+import scala.util.{Random, Try}
object Graph {
+
+ type HashKey = (Int, Int, Int, Int, Boolean)
+ type FilterHashKey = (Int, Int)
+
+
val DefaultScore = 1.0
private val DefaultConfigs: Map[String, AnyRef] = Map(
@@ -65,26 +71,130 @@ object Graph {
"future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
"future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
"future.cache.metric.interval" -> java.lang.Integer.valueOf(60000),
+ "query.future.cache.max.size" -> java.lang.Integer.valueOf(1000),
+ "query.future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
+ "query.future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
+ "query.future.cache.metric.interval" -> java.lang.Integer.valueOf(60000),
"s2graph.storage.backend" -> "hbase",
- "query.hardlimit" -> java.lang.Integer.valueOf(100000)
+ "query.hardlimit" -> java.lang.Integer.valueOf(100000),
+ "hbase.zookeeper.znode.parent" -> "/hbase",
+ "query.log.sample.rate" -> Double.box(0.05)
)
var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs)
- /** helpers for filterEdges */
- type HashKey = (Int, Int, Int, Int, Boolean)
- type FilterHashKey = (Int, Int)
- type Result = (ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]],
- ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)],
- ListBuffer[(HashKey, FilterHashKey, Edge, Double)])
+ def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try {
+ val parts = GraphUtil.split(s)
+ val logType = parts(2)
+ val element = if (logType == "edge" | logType == "e") {
+ /** current only edge is considered to be bulk loaded */
+ labelMapping.get(parts(5)) match {
+ case None =>
+ case Some(toReplace) =>
+ parts(5) = toReplace
+ }
+ toEdge(parts)
+ } else if (logType == "vertex" | logType == "v") {
+ toVertex(parts)
+ } else {
+ throw new GraphExceptions.JsonParseException("log type is not exist in log.")
+ }
- def toHashKey(queryParam: QueryParam, edge: Edge, isDegree: Boolean): (HashKey, FilterHashKey) = {
- val src = edge.srcVertex.innerId.hashCode()
- val tgt = edge.tgtVertex.innerId.hashCode()
- val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree)
- val filterHashKey = (src, tgt)
+ element
+ } recover {
+ case e: Exception =>
+ logger.error(s"[toElement]: $e", e)
+ None
+ } get
- (hashKey, filterHashKey)
+
+ def toVertex(s: String): Option[Vertex] = {
+ toVertex(GraphUtil.split(s))
+ }
+
+ def toEdge(s: String): Option[Edge] = {
+ toEdge(GraphUtil.split(s))
+ }
+
+ def toEdge(parts: Array[String]): Option[Edge] = Try {
+ val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
+ val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
+ val tempDirection = if (parts.length >= 8) parts(7) else "out"
+ val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection
+ val edge = Edge.toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation)
+ Option(edge)
+ } recover {
+ case e: Exception =>
+ logger.error(s"[toEdge]: $e", e)
+ throw e
+ } get
+
+ def toVertex(parts: Array[String]): Option[Vertex] = Try {
+ val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
+ val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
+ val vertex = Vertex.toVertex(serviceName, colName, srcId, props, ts.toLong, operation)
+ Option(vertex)
+ } recover {
+ case e: Throwable =>
+ logger.error(s"[toVertex]: $e", e)
+ throw e
+ } get
+
+ def initStorage(graph: Graph, config: Config)(ec: ExecutionContext): Storage[_, _] = {
+ val storageBackend = config.getString("s2graph.storage.backend")
+ logger.info(s"[InitStorage]: $storageBackend")
+
+ storageBackend match {
+ case "hbase" => new AsynchbaseStorage(graph, config)(ec)
+ case _ => throw new RuntimeException("not supported storage.")
+ }
+ }
+
+ def parseCacheConfig(config: Config, prefix: String): Config = {
+ import scala.collection.JavaConversions._
+
+ val kvs = new java.util.HashMap[String, AnyRef]()
+ for {
+ entry <- config.entrySet()
+ (k, v) = (entry.getKey, entry.getValue) if k.startsWith(prefix)
+ } yield {
+ val newKey = k.replace(prefix, "")
+ kvs.put(newKey, v.unwrapped())
+ }
+ ConfigFactory.parseMap(kvs)
+ }
+
+ /** Global helper functions */
+ @tailrec
+ final def randomInt(sampleNumber: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = {
+ if (range < sampleNumber || set.size == sampleNumber) set
+ else randomInt(sampleNumber, range, set + Random.nextInt(range))
+ }
+
+ def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = {
+ if (edges.size <= n) {
+ edges
+ } else {
+ val plainEdges = if (queryRequest.queryParam.offset == 0) {
+ edges.tail
+ } else edges
+
+ val randoms = randomInt(n, plainEdges.size)
+ var samples = List.empty[EdgeWithScore]
+ var idx = 0
+ plainEdges.foreach { e =>
+ if (randoms.contains(idx)) samples = e :: samples
+ idx += 1
+ }
+ samples
+ }
+ }
+
+ def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = {
+ val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score }
+ edgeWithScores.map { edgeWithScore =>
+ edgeWithScore.copy(score = edgeWithScore.score / sum)
+ }
}
def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): Map[(LabelWithDirection, Vertex), Boolean] = {
@@ -100,7 +210,7 @@ object Graph {
/** common methods for filter out, transform, aggregate queryResult */
def convertEdges(queryParam: QueryParam, edge: Edge, nextStepOpt: Option[Step]): Seq[Edge] = {
for {
- convertedEdge <- queryParam.transformer.transform(edge, nextStepOpt) if !edge.isDegree
+ convertedEdge <- queryParam.edgeTransformer.transform(queryParam, edge, nextStepOpt) if !edge.isDegree
} yield convertedEdge
}
@@ -110,18 +220,17 @@ object Graph {
case None => 1.0
case Some(timeDecay) =>
val tsVal = try {
- val labelMeta = edge.label.metaPropsMap(timeDecay.labelMetaSeq)
- val innerValWithTsOpt = edge.propsWithTs.get(timeDecay.labelMetaSeq)
+ val innerValWithTsOpt = edge.propsWithTs.get(timeDecay.labelMeta)
innerValWithTsOpt.map { innerValWithTs =>
val innerVal = innerValWithTs.innerVal
- labelMeta.dataType match {
+ timeDecay.labelMeta.dataType match {
case InnerVal.LONG => innerVal.value match {
case n: BigDecimal => n.bigDecimal.longValue()
case _ => innerVal.toString().toLong
}
case _ => innerVal.toString().toLong
}
- } getOrElse(edge.ts)
+ } getOrElse (edge.ts)
} catch {
case e: Exception =>
logger.error(s"processTimeDecay error. ${edge.toLogString}", e)
@@ -134,264 +243,891 @@ object Graph {
tsVal
}
- def processDuplicates(queryParam: QueryParam,
- duplicates: Seq[(FilterHashKey, EdgeWithScore)]): Seq[(FilterHashKey, EdgeWithScore)] = {
+ def processDuplicates[T](queryParam: QueryParam,
+ duplicates: Seq[(FilterHashKey, T)])(implicit ev: WithScore[T]): Seq[(FilterHashKey, T)] = {
if (queryParam.label.consistencyLevel != "strong") {
//TODO:
queryParam.duplicatePolicy match {
- case Query.DuplicatePolicy.First => Seq(duplicates.head)
- case Query.DuplicatePolicy.Raw => duplicates
- case Query.DuplicatePolicy.CountSum =>
+ case DuplicatePolicy.First => Seq(duplicates.head)
+ case DuplicatePolicy.Raw => duplicates
+ case DuplicatePolicy.CountSum =>
val countSum = duplicates.size
- Seq(duplicates.head._1 -> duplicates.head._2.copy(score = countSum))
+ val (headFilterHashKey, headEdgeWithScore) = duplicates.head
+ Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, countSum))
case _ =>
- val scoreSum = duplicates.foldLeft(0.0) { case (prev, current) => prev + current._2.score }
- Seq(duplicates.head._1 -> duplicates.head._2.copy(score = scoreSum))
+ val scoreSum = duplicates.foldLeft(0.0) { case (prev, current) => prev + ev.score(current._2) }
+ val (headFilterHashKey, headEdgeWithScore) = duplicates.head
+ Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, scoreSum))
}
} else {
duplicates
}
}
+
+ def toHashKey(queryParam: QueryParam, edge: Edge, isDegree: Boolean): (HashKey, FilterHashKey) = {
+ val src = edge.srcVertex.innerId.hashCode()
+ val tgt = edge.tgtVertex.innerId.hashCode()
+ val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree)
+ val filterHashKey = (src, tgt)
+
+ (hashKey, filterHashKey)
+ }
+
def filterEdges(q: Query,
stepIdx: Int,
queryRequests: Seq[QueryRequest],
- queryResultLsFuture: Future[Seq[StepInnerResult]],
+ queryResultLsFuture: Future[Seq[StepResult]],
queryParams: Seq[QueryParam],
- alreadyVisited: Map[(LabelWithDirection, Vertex), Boolean] = Map.empty[(LabelWithDirection, Vertex), Boolean])
- (implicit ec: scala.concurrent.ExecutionContext): Future[StepInnerResult] = {
+ alreadyVisited: Map[(LabelWithDirection, Vertex), Boolean] = Map.empty,
+ buildLastStepInnerResult: Boolean = true,
+ parentEdges: Map[VertexId, Seq[EdgeWithScore]])
+ (implicit ec: scala.concurrent.ExecutionContext): Future[StepResult] = {
queryResultLsFuture.map { queryRequestWithResultLs =>
- if (queryRequestWithResultLs.isEmpty) StepInnerResult.Empty
+ val (cursors, failCount) = {
+ val _cursors = ArrayBuffer.empty[Array[Byte]]
+ var _failCount = 0
+
+ queryRequestWithResultLs.foreach { stepResult =>
+ _cursors.append(stepResult.cursors: _*)
+ _failCount += stepResult.failCount
+ }
+
+ _cursors -> _failCount
+ }
+
+
+ if (queryRequestWithResultLs.isEmpty) StepResult.Empty.copy(failCount = failCount)
else {
+ val isLastStep = stepIdx == q.steps.size - 1
+ val queryOption = q.queryOption
val step = q.steps(stepIdx)
- val nextStepOpt = if (stepIdx < q.steps.size - 1) Option(q.steps(stepIdx + 1)) else None
-
- val excludeLabelWithDirSet = new util.HashSet[(Int, Int)]
- val includeLabelWithDirSet = new util.HashSet[(Int, Int)]
- step.queryParams.filter(_.exclude).foreach(l => excludeLabelWithDirSet.add(l.labelWithDir.labelId -> l.labelWithDir.dir))
- step.queryParams.filter(_.include).foreach(l => includeLabelWithDirSet.add(l.labelWithDir.labelId -> l.labelWithDir.dir))
-
- val edgesToExclude = new util.HashSet[FilterHashKey]()
- val edgesToInclude = new util.HashSet[FilterHashKey]()
-
- val sequentialLs = new ListBuffer[(HashKey, FilterHashKey, EdgeWithScore)]()
- val agg = new mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, EdgeWithScore)]]()
- val params = new mutable.HashMap[HashKey, QueryParam]()
- var numOfDuplicates = 0
- var numOfTotal = 0
- queryRequests.zip(queryRequestWithResultLs).foreach { case (queryRequest, stepInnerResult) =>
- val queryParam = queryRequest.queryParam
- val labelWeight = step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0)
- val includeExcludeKey = queryParam.labelWithDir.labelId -> queryParam.labelWithDir.dir
- val shouldBeExcluded = excludeLabelWithDirSet.contains(includeExcludeKey)
- val shouldBeIncluded = includeLabelWithDirSet.contains(includeExcludeKey)
- val where = queryParam.where.get
-
- for {
- edgeWithScore <- stepInnerResult.edgesWithScoreLs
- if where == WhereParser.success || where.filter(edgeWithScore.edge)
- (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
- } {
- numOfTotal += 1
- if (queryParam.transformer.isDefault) {
- val convertedEdge = edge
-
- val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree = false)
-
- /** check if this edge should be exlcuded. */
- if (shouldBeExcluded) {
- edgesToExclude.add(filterHashKey)
- } else {
- if (shouldBeIncluded) {
- edgesToInclude.add(filterHashKey)
- }
- val tsVal = processTimeDecay(queryParam, convertedEdge)
- val newScore = labelWeight * score * tsVal
- val newEdgeWithScore = EdgeWithScore(convertedEdge, newScore)
- sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore))
- agg.get(hashKey) match {
- case None =>
- val newLs = new ListBuffer[(FilterHashKey, EdgeWithScore)]()
- newLs += (filterHashKey -> newEdgeWithScore)
- agg += (hashKey -> newLs)
- case Some(old) =>
- numOfDuplicates += 1
- old += (filterHashKey -> newEdgeWithScore)
- }
- params += (hashKey -> queryParam)
+ val currentStepResults = queryRequests.view.zip(queryRequestWithResultLs)
+ val shouldBuildInnerResults = !isLastStep || buildLastStepInnerResult
+ val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges)
+
+ if (shouldBuildInnerResults) {
+ val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) =>
+ edgeWithScore
+ }
+
+ /** process step group by */
+ val results = StepResult.filterOutStepGroupBy(_results, step.groupBy)
+ StepResult(edgeWithScores = results, grouped = Nil, degreeEdges = degrees, cursors = cursors, failCount = failCount)
+
+ } else {
+ val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) =>
+ val edge = edgeWithScore.edge
+ val score = edgeWithScore.score
+ val label = edgeWithScore.label
+
+ /** Select */
+ val mergedPropsWithTs =
+ if (queryOption.selectColumns.isEmpty) {
+ label.metaPropsDefaultMapInner.map { case (labelMeta, defaultVal) =>
+ labelMeta -> edge.propsWithTs.getOrElse(labelMeta, defaultVal)
}
} else {
- convertEdges(queryParam, edge, nextStepOpt).foreach { convertedEdge =>
- val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree = false)
-
- /** check if this edge should be exlcuded. */
- if (shouldBeExcluded) {
- edgesToExclude.add(filterHashKey)
- } else {
- if (shouldBeIncluded) {
- edgesToInclude.add(filterHashKey)
- }
- val tsVal = processTimeDecay(queryParam, convertedEdge)
- val newScore = labelWeight * score * tsVal
- val newEdgeWithScore = EdgeWithScore(convertedEdge, newScore)
- sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore))
- agg.get(hashKey) match {
- case None =>
- val newLs = new ListBuffer[(FilterHashKey, EdgeWithScore)]()
- newLs += (filterHashKey -> newEdgeWithScore)
- agg += (hashKey -> newLs)
- case Some(old) =>
- numOfDuplicates += 1
- old += (filterHashKey -> newEdgeWithScore)
- }
- params += (hashKey -> queryParam)
- }
+ val initial = Map(LabelMeta.timestamp -> edge.propsWithTs(LabelMeta.timestamp))
+ propsSelectColumns.foldLeft(initial) { case (prev, labelMeta) =>
+ prev + (labelMeta -> edge.propsWithTs.getOrElse(labelMeta, label.metaPropsDefaultMapInner(labelMeta)))
}
}
- }
- }
+ val newEdge = edge.copy(propsWithTs = mergedPropsWithTs)
+ val newEdgeWithScore = edgeWithScore.copy(edge = newEdge)
+ /** OrderBy */
+ val orderByValues =
+ if (queryOption.orderByKeys.isEmpty) (score, edge.tsInnerVal, None, None)
+ else StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys))
+
+ /** StepGroupBy */
+ val stepGroupByValues = newEdgeWithScore.toValues(step.groupBy.keys)
+
+ /** GroupBy */
+ val groupByValues = newEdgeWithScore.toValues(queryOption.groupBy.keys)
+
+ /** FilterOut */
+ val filterOutValues = newEdgeWithScore.toValues(queryOption.filterOutFields)
- val edgeWithScoreLs = new ListBuffer[EdgeWithScore]()
- if (numOfDuplicates == 0) {
- // no duplicates at all.
- for {
- (hashKey, filterHashKey, edgeWithScore) <- sequentialLs
- if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
- } {
- edgeWithScoreLs += edgeWithScore
+ newEdgeWithScore.copy(orderByValues = orderByValues,
+ stepGroupByValues = stepGroupByValues,
+ groupByValues = groupByValues,
+ filterOutValues = filterOutValues)
}
- } else {
- // need to resolve duplicates.
- val seen = new mutable.HashSet[HashKey]()
- for {
- (hashKey, filterHashKey, edgeWithScore) <- sequentialLs
- if !seen.contains(hashKey)
- if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
- } {
- val queryParam = params(hashKey)
- val duplicates = processDuplicates(queryParam, agg(hashKey))
- duplicates.foreach { case (_, duplicate) =>
- if (duplicate.score >= queryParam.threshold) {
- seen += hashKey
- edgeWithScoreLs += duplicate
+
+ /** process step group by */
+ val results = StepResult.filterOutStepGroupBy(_results, step.groupBy)
+
+ /** process ordered list */
+ val ordered = if (queryOption.groupBy.keys.isEmpty) StepResult.orderBy(queryOption, results) else Nil
+
+ /** process grouped list */
+ val grouped =
+ if (queryOption.groupBy.keys.isEmpty) Nil
+ else {
+ val agg = new scala.collection.mutable.HashMap[StepResult.GroupByKey, (Double, StepResult.Values)]()
+ results.groupBy { edgeWithScore =>
+ // edgeWithScore.groupByValues.map(_.map(_.toString))
+ edgeWithScore.groupByValues
+ }.foreach { case (k, ls) =>
+ val (scoreSum, merged) = StepResult.mergeOrdered(ls, Nil, queryOption)
+
+ val newScoreSum = scoreSum
+
+ /**
+ * watch out here. by calling toString on Any, we lose type information which will be used
+ * later for toJson.
+ */
+ if (merged.nonEmpty) {
+ val newKey = merged.head.groupByValues
+ agg += (newKey -> (newScoreSum, merged))
}
}
+ agg.toSeq.sortBy(_._2._1 * -1)
}
- }
- val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges)
- StepInnerResult(edgesWithScoreLs = edgeWithScoreLs, degreeEdges = degrees)
+ StepResult(edgeWithScores = ordered, grouped = grouped, degreeEdges = degrees, cursors = cursors, failCount = failCount)
+ }
}
}
}
- def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try {
- val parts = GraphUtil.split(s)
- val logType = parts(2)
- val element = if (logType == "edge" | logType == "e") {
- /* current only edge is considered to be bulk loaded */
- labelMapping.get(parts(5)) match {
- case None =>
- case Some(toReplace) =>
- parts(5) = toReplace
+ private def toEdgeWithScores(queryRequest: QueryRequest,
+ stepResult: StepResult,
+ parentEdges: Map[VertexId, Seq[EdgeWithScore]]): Seq[EdgeWithScore] = {
+ val queryOption = queryRequest.query.queryOption
+ val queryParam = queryRequest.queryParam
+ val prevScore = queryRequest.prevStepScore
+ val labelWeight = queryRequest.labelWeight
+ val edgeWithScores = stepResult.edgeWithScores
+
+ val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent
+ val parents = if (shouldBuildParents) {
+ parentEdges.getOrElse(queryRequest.vertex.id, Nil).map { edgeWithScore =>
+ val edge = edgeWithScore.edge
+ val score = edgeWithScore.score
+ val label = edgeWithScore.label
+
+ /** Select */
+ val mergedPropsWithTs =
+ if (queryOption.selectColumns.isEmpty) {
+ label.metaPropsDefaultMapInner.map { case (labelMeta, defaultVal) =>
+ labelMeta -> edge.propsWithTs.getOrElse(labelMeta, defaultVal)
+ }
+ } else {
+ val initial = Map(LabelMeta.timestamp -> edge.propsWithTs(LabelMeta.timestamp))
+ queryOption.selectColumns.foldLeft(initial) { case (acc, labelMetaName) =>
+ label.metaPropsDefaultMapInnerString.get(labelMetaName) match {
+ case None => acc
+ case Some(defaultValue) =>
+ val labelMeta = label.metaPropsInvMap(labelMetaName)
+ acc + (labelMeta -> edge.propsWithTs.getOrElse(labelMeta, defaultValue))
+ }
+ }
+ }
+
+ val newEdge = edge.copy(propsWithTs = mergedPropsWithTs)
+ edgeWithScore.copy(edge = newEdge)
}
- toEdge(parts)
- } else if (logType == "vertex" | logType == "v") {
- toVertex(parts)
- } else {
- throw new GraphExceptions.JsonParseException("log type is not exist in log.")
- }
+ } else Nil
- element
- } recover {
- case e: Exception =>
- logger.error(s"[toElement]: $e", e)
- None
- } get
+ // skip
+ if (queryOption.ignorePrevStepCache) stepResult.edgeWithScores
+ else {
+ val degreeScore = 0.0
+ val sampled =
+ if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
+ else edgeWithScores
- def toVertex(s: String): Option[Vertex] = {
- toVertex(GraphUtil.split(s))
- }
+ val withScores = for {
+ edgeWithScore <- sampled
+ } yield {
+ val edge = edgeWithScore.edge
+ val edgeScore = edgeWithScore.score
+ val score = queryParam.scorePropagateOp match {
+ case "plus" => edgeScore + prevScore
+ case "divide" =>
+ if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0
+ else edgeScore / (prevScore + queryParam.scorePropagateShrinkage)
+ case _ => edgeScore * prevScore
+ }
- def toEdge(s: String): Option[Edge] = {
- toEdge(GraphUtil.split(s))
+ val tsVal = processTimeDecay(queryParam, edge)
+ val newScore = degreeScore + score
+ // val newEdge = if (queryOption.returnTree) edge.copy(parentEdges = parents) else edge
+ val newEdge = edge.copy(parentEdges = parents)
+ edgeWithScore.copy(edge = newEdge, score = newScore * labelWeight * tsVal)
+ }
+
+ val normalized =
+ if (queryParam.shouldNormalize) normalize(withScores)
+ else withScores
+
+ normalized
+ }
}
- def toEdge(parts: Array[String]): Option[Edge] = Try {
- val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
- val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
- val tempDirection = if (parts.length >= 8) parts(7) else "out"
- val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection
- val edge = Edge.toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation)
- Option(edge)
- } recover {
- case e: Exception =>
- logger.error(s"[toEdge]: $e", e)
- throw e
- } get
+ private def buildResult[T](query: Query,
+ stepIdx: Int,
+ stepResultLs: Seq[(QueryRequest, StepResult)],
+ parentEdges: Map[VertexId, Seq[EdgeWithScore]])
+ (createFunc: (EdgeWithScore, Set[LabelMeta]) => T)
+ (implicit ev: WithScore[T]): ListBuffer[T] = {
+ import scala.collection._
- def toVertex(parts: Array[String]): Option[Vertex] = Try {
- val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
- val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
- val vertex = Vertex.toVertex(serviceName, colName, srcId, props, ts.toLong, operation)
- Option(vertex)
- } recover {
- case e: Throwable =>
- logger.error(s"[toVertex]: $e", e)
- throw e
- } get
+ val results = ListBuffer.empty[T]
+ val sequentialLs: ListBuffer[(HashKey, FilterHashKey, T, QueryParam)] = ListBuffer.empty
+ val duplicates: mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, T)]] = mutable.HashMap.empty
+ val edgesToExclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty
+ val edgesToInclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty
- def initStorage(graph: Graph, config: Config)(ec: ExecutionContext) = {
- config.getString("s2graph.storage.backend") match {
- case "hbase" => new AsynchbaseStorage(graph, config)(ec)
- case _ => throw new RuntimeException("not supported storage.")
+ var numOfDuplicates = 0
+ val queryOption = query.queryOption
+ val step = query.steps(stepIdx)
+ val excludeLabelWithDirSet = step.queryParams.filter(_.exclude).map(l => l.labelWithDir).toSet
+ val includeLabelWithDirSet = step.queryParams.filter(_.include).map(l => l.labelWithDir).toSet
+
+ stepResultLs.foreach { case (queryRequest, stepInnerResult) =>
+ val queryParam = queryRequest.queryParam
+ val label = queryParam.label
+ val shouldBeExcluded = excludeLabelWithDirSet.contains(queryParam.labelWithDir)
+ val shouldBeIncluded = includeLabelWithDirSet.contains(queryParam.labelWithDir)
+
+ val propsSelectColumns = (for {
+ column <- queryOption.propsSelectColumns
+ labelMeta <- label.metaPropsInvMap.get(column)
+ } yield labelMeta).toSet
+
+ for {
+ edgeWithScore <- toEdgeWithScores(queryRequest, stepInnerResult, parentEdges)
+ } {
+ val edge = edgeWithScore.edge
+ val (hashKey, filterHashKey) = toHashKey(queryParam, edge, isDegree = false)
+ // params += (hashKey -> queryParam) //
+
+ /** check if this edge should be exlcuded. */
+ if (shouldBeExcluded) {
+ edgesToExclude.add(filterHashKey)
+ } else {
+ if (shouldBeIncluded) {
+ edgesToInclude.add(filterHashKey)
+ }
+ val newEdgeWithScore = createFunc(edgeWithScore, propsSelectColumns)
+
+ sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore, queryParam))
+ duplicates.get(hashKey) match {
+ case None =>
+ val newLs = ListBuffer.empty[(FilterHashKey, T)]
+ newLs += (filterHashKey -> newEdgeWithScore)
+ duplicates += (hashKey -> newLs) //
+ case Some(old) =>
+ numOfDuplicates += 1
+ old += (filterHashKey -> newEdgeWithScore) //
+ }
+ }
+ }
}
+
+
+ if (numOfDuplicates == 0) {
+ // no duplicates at all.
+ for {
+ (hashKey, filterHashKey, edgeWithScore, _) <- sequentialLs
+ if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
+ } {
+ results += edgeWithScore
+ }
+ } else {
+ // need to resolve duplicates.
+ val seen = new mutable.HashSet[HashKey]()
+ for {
+ (hashKey, filterHashKey, edgeWithScore, queryParam) <- sequentialLs
+ if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
+ if !seen.contains(hashKey)
+ } {
+ // val queryParam = params(hashKey)
+ processDuplicates(queryParam, duplicates(hashKey)).foreach { case (_, duplicate) =>
+ if (ev.score(duplicate) >= queryParam.threshold) {
+ seen += hashKey
+ results += duplicate
+ }
+ }
+ }
+ }
+ results
}
+
}
class Graph(_config: Config)(implicit val ec: ExecutionContext) {
+
+ import Graph._
+
val config = _config.withFallback(Graph.DefaultConfig)
Model.apply(config)
Model.loadCache()
- // TODO: Make storage client by config param
- val storage = Graph.initStorage(this, config)(ec)
+ val MaxRetryNum = config.getInt("max.retry.number")
+ val MaxBackOff = config.getInt("max.back.off")
+ val BackoffTimeout = config.getInt("back.off.timeout")
+ val DeleteAllFetchCount = config.getInt("delete.all.fetch.count")
+ val DeleteAllFetchSize = config.getInt("delete.all.fetch.size")
+ val FailProb = config.getDouble("hbase.fail.prob")
+ val LockExpireDuration = config.getInt("lock.expire.time")
+ val MaxSize = config.getInt("future.cache.max.size")
+ val ExpireAfterWrite = config.getInt("future.cache.expire.after.write")
+ val ExpireAfterAccess = config.getInt("future.cache.expire.after.access")
+ val scheduledEx = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
+
+ private def confWithFallback(conf: Config): Config = {
+ conf.withFallback(config)
+ }
+
+ /**
+ * TODO: we need to some way to handle malformed configuration for storage.
+ */
+ val storagePool: scala.collection.mutable.Map[String, Storage[_, _]] = {
+ val labels = Label.findAll()
+ val services = Service.findAll()
+
+ val labelConfigs = labels.flatMap(_.toStorageConfig)
+ val serviceConfigs = services.flatMap(_.toStorageConfig)
+
+ val configs = (labelConfigs ++ serviceConfigs).map { conf =>
+ confWithFallback(conf)
+ }.toSet
+
+ val pools = new java.util.HashMap[Config, Storage[_, _]]()
+ configs.foreach { config =>
+ pools.put(config, Graph.initStorage(this, config)(ec))
+ }
+
+ val m = new java.util.concurrent.ConcurrentHashMap[String, Storage[_, _]]()
+
+ labels.foreach { label =>
+ if (label.storageConfigOpt.isDefined) {
+ m += (s"label:${label.label}" -> pools(label.storageConfigOpt.get))
+ }
+ }
+
+ services.foreach { service =>
+ if (service.storageConfigOpt.isDefined) {
+ m += (s"service:${service.serviceName}" -> pools(service.storageConfigOpt.get))
+ }
+ }
+
+ m
+ }
+
+ val defaultStorage: Storage[_, _] = Graph.initStorage(this, config)(ec)
+
+ /** QueryLevel FutureCache */
+ val queryFutureCache = new DeferCache[StepResult, Promise, Future](parseCacheConfig(config, "query."), empty = StepResult.Empty)
for {
entry <- config.entrySet() if Graph.DefaultConfigs.contains(entry.getKey)
(k, v) = (entry.getKey, entry.getValue)
} logger.info(s"[Initialized]: $k, ${this.config.getAnyRef(k)}")
- /** select */
- def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[StepResult] = storage.checkEdges(params)
+ def getStorage(service: Service): Storage[_, _] = {
+ storagePool.getOrElse(s"service:${service.serviceName}", defaultStorage)
+ }
+
+ def getStorage(label: Label): Storage[_, _] = {
+ storagePool.getOrElse(s"label:${label.label}", defaultStorage)
+ }
+
+ def flushStorage(): Unit = {
+ storagePool.foreach { case (_, storage) =>
+
+ /** flush is blocking */
+ storage.flush()
+ }
+ }
+
+ def fallback = Future.successful(StepResult.Empty)
+
+ def checkEdges(edges: Seq[Edge]): Future[StepResult] = {
+ val futures = for {
+ edge <- edges
+ } yield {
+ fetchSnapshotEdge(edge).map { case (queryParam, edgeOpt, kvOpt) =>
+ edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0, queryParam.label))
+ }
+ }
+
+ Future.sequence(futures).map { edgeWithScoreLs =>
+ val edgeWithScores = edgeWithScoreLs.flatten
+ StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = Nil)
+ }
+ }
+
+ // def checkEdges(edges: Seq[Edge]): Future[StepResult] = storage.checkEdges(edges)
+
+ def getEdges(q: Query): Future[StepResult] = {
+ Try {
+ if (q.steps.isEmpty) {
+ // TODO: this should be get vertex query.
+ fallback
+ } else {
+ val filterOutFuture = q.queryOption.filterOutQuery match {
+ case None => fallback
+ case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery)
+ }
+ for {
+ stepResult <- getEdgesStepInner(q)
+ filterOutInnerResult <- filterOutFuture
+ } yield {
+ if (filterOutInnerResult.isEmpty) stepResult
+ else StepResult.filterOut(this, q.queryOption, stepResult, filterOutInnerResult)
+ }
+ }
+ } recover {
+ case e: Exception =>
+ logger.error(s"getEdgesAsync: $e", e)
+ fallback
+ } get
+ }
+
+ def getEdgesStepInner(q: Query, buildLastStepInnerResult: Boolean = false): Future[StepResult] = {
+ Try {
+ if (q.steps.isEmpty) fallback
+ else {
+
+ val queryOption = q.queryOption
+ def fetch: Future[StepResult] = {
+ val startStepInnerResult = QueryResult.fromVertices(this, q)
+ q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) =>
+ for {
+ prevStepInnerResult <- prevStepInnerResultFuture
+ currentStepInnerResult <- fetchStep(q, stepIdx, prevStepInnerResult, buildLastStepInnerResult)
+ } yield {
+ currentStepInnerResult.copy(
+ accumulatedCursors = prevStepInnerResult.accumulatedCursors :+ currentStepInnerResult.cursors,
+ failCount = currentStepInnerResult.failCount + prevStepInnerResult.failCount
+ )
+ }
+ }
+ }
+
+ fetch
+ }
+ } recover {
+ case e: Exception =>
+ logger.error(s"getEdgesAsync: $e", e)
+ fallback
+ } get
+ }
+
+ def fetchStep(orgQuery: Query,
+ stepIdx: Int,
+ stepInnerResult: StepResult,
+ buildLastStepInnerResult: Boolean = false): Future[StepResult] = {
+ if (stepInnerResult.isEmpty) Future.successful(StepResult.Empty)
+ else {
+ val edgeWithScoreLs = stepInnerResult.edgeWithScores
+
+ val q = orgQuery
+ val queryOption = orgQuery.queryOption
+ val prevStepOpt = if (stepIdx > 0) Option(q.steps(stepIdx - 1)) else None
+ val prevStepThreshold = prevStepOpt.map(_.nextStepScoreThreshold).getOrElse(QueryParam.DefaultThreshold)
+ val prevStepLimit = prevStepOpt.map(_.nextStepLimit).getOrElse(-1)
+ val step = q.steps(stepIdx)
+
+ val alreadyVisited =
+ if (stepIdx == 0) Map.empty[(LabelWithDirection, Vertex), Boolean]
+ else alreadyVisitedVertices(stepInnerResult.edgeWithScores)
+
+ val initial = (Map.empty[Vertex, Double], Map.empty[Vertex, ArrayBuffer[EdgeWithScore]])
+ val (sums, grouped) = edgeWithScoreLs.foldLeft(initial) { case ((sum, group), edgeWithScore) =>
+ val key = edgeWithScore.edge.tgtVertex
+ val newScore = sum.getOrElse(key, 0.0) + edgeWithScore.score
+ val buffer = group.getOrElse(key, ArrayBuffer.empty[EdgeWithScore])
+ buffer += edgeWithScore
+ (sum + (key -> newScore), group + (key -> buffer))
+ }
+ val groupedByFiltered = sums.filter(t => t._2 >= prevStepThreshold)
+ val prevStepTgtVertexIdEdges = grouped.map(t => t._1.id -> t._2)
+
+ val nextStepSrcVertices = if (prevStepLimit >= 0) {
+ groupedByFiltered.toSeq.sortBy(-1 * _._2).take(prevStepLimit)
+ } else {
+ groupedByFiltered.toSeq
+ }
+
+ val queryRequests = for {
+ (vertex, prevStepScore) <- nextStepSrcVertices
+ queryParam <- step.queryParams
+ } yield {
+ val labelWeight = step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0)
+ val newPrevStepScore = if (queryOption.shouldPropagateScore) prevStepScore else 1.0
+ QueryRequest(q, stepIdx, vertex, queryParam, newPrevStepScore, labelWeight)
+ }
+
+ val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges)
+
+ filterEdges(orgQuery, stepIdx, queryRequests,
+ fetchedLs, orgQuery.steps(stepIdx).queryParams, alreadyVisited, buildLastStepInnerResult, prevStepTgtVertexIdEdges)(ec)
+ }
+ }
+
+
+ /**
+ * responsible to fire parallel fetch call into storage and create future that will return merged result.
+ *
+ * @param queryRequests
+ * @param prevStepEdges
+ * @return
+ */
+ def fetches(queryRequests: Seq[QueryRequest],
+ prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepResult]] = {
+
+ val reqWithIdxs = queryRequests.zipWithIndex
+ val requestsPerLabel = reqWithIdxs.groupBy(t => t._1.queryParam.label)
+ val aggFuture = requestsPerLabel.foldLeft(Future.successful(Map.empty[Int, StepResult])) { case (prevFuture, (label, reqWithIdxs)) =>
+ for {
+ prev <- prevFuture
+ cur <- getStorage(label).fetches(reqWithIdxs.map(_._1), prevStepEdges)
+ } yield {
+ prev ++ reqWithIdxs.map(_._2).zip(cur).toMap
+ }
+ }
+ aggFuture.map { agg => agg.toSeq.sortBy(_._1).map(_._2) }
+ }
+
+
+ def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = {
+ Try {
+ if (mq.queries.isEmpty) fallback
+ else {
+ val filterOutFuture = mq.queryOption.filterOutQuery match {
+ case None => fallback
+ case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery)
+ }
+
+ val multiQueryFutures = Future.sequence(mq.queries.map { query => getEdges(query) })
+ for {
+ multiQueryResults <- multiQueryFutures
+ filterOutInnerResult <- filterOutFuture
+ } yield {
+ StepResult.merges(mq.queryOption, multiQueryResults, mq.weights, filterOutInnerResult)
+ }
+ }
+ } recover {
+ case e: Exception =>
+ logger.error(s"getEdgesAsync: $e", e)
+ fallback
+ } get
+ }
+
+
+ def fetchSnapshotEdge(edge: Edge): Future[(QueryParam, Option[Edge], Option[SKeyValue])] = {
+ /** TODO: Fix this. currently fetchSnapshotEdge should not use future cache
+ * so use empty cacheKey.
+ * */
+ val queryParam = QueryParam(labelName = edge.label.label,
+ direction = GraphUtil.fromDirection(edge.labelWithDir.dir),
+ tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal),
+ cacheTTLInMillis = -1)
+ val q = Query.toQuery(Seq(edge.srcVertex), queryParam)
+ val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam)
+
+ val storage = getStorage(edge.label)
+ storage.fetchSnapshotEdgeKeyValues(queryRequest).map { kvs =>
+ val (edgeOpt, kvOpt) =
+ if (kvs.isEmpty) (None, None)
+ else {
+ val snapshotEdgeOpt = storage.toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil)
+ val _kvOpt = kvs.headOption
+ (snapshotEdgeOpt, _kvOpt)
+ }
+ (queryParam, edgeOpt, kvOpt)
+ } recoverWith { case ex: Throwable =>
+ logger.error(s"fetchQueryParam failed. fallback return.", ex)
+ throw FetchTimeoutException(s"${edge.toLogString}")
+ }
+ }
+
+ def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = {
+ val verticesWithIdx = vertices.zipWithIndex
+ val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
+ getStorage(service).getVertices(vertexGroup.map(_._1)).map(_.zip(vertexGroup.map(_._2)))
+ }
+
+ Future.sequence(futures).map { ls =>
+ ls.flatten.toSeq.sortBy(_._2).map(_._1)
+ }
+ }
+
+ /** mutate */
+ def deleteAllAdjacentEdges(srcVertices: Seq[Vertex],
+ labels: Seq[Label],
+ dir: Int,
+ ts: Long): Future[Boolean] = {
+
+ val requestTs = ts
+ val vertices = srcVertices
+ /** create query per label */
+ val queries = for {
+ label <- labels
+ } yield {
+ val queryParam = QueryParam(labelName = label.label, direction = GraphUtil.fromDirection(dir),
+ offset = 0, limit = DeleteAllFetchSize, duplicatePolicy = DuplicatePolicy.Raw)
+ val step = Step(List(queryParam))
+ Query(vertices, Vector(step))
+ }
+
+ // Extensions.retryOnSuccessWithBackoff(MaxRetryNum, Random.nextInt(MaxBackOff) + 1) {
+ val retryFuture = Extensions.retryOnSuccess(DeleteAllFetchCount) {
+ fetchAndDeleteAll(queries, requestTs)
+ } { case (allDeleted, deleteSuccess) =>
+ allDeleted && deleteSuccess
+ }.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess }
+
+ retryFuture onFailure {
+ case ex =>
+ logger.error(s"[Error]: deleteAllAdjacentEdges failed.")
+ }
- def getEdges(q: Query): Future[StepResult] = storage.getEdges(q)
+ retryFuture
+ }
- def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = storage.getEdgesMultiQuery(mq)
+ def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = {
+ val future = for {
+ stepInnerResultLs <- Future.sequence(queries.map(getEdgesStepInner(_, true)))
+ (allDeleted, ret) <- deleteAllFetchedEdgesLs(stepInnerResultLs, requestTs)
+ } yield {
+ // logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}")
+ (allDeleted, ret)
+ }
- def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = storage.getVertices(vertices)
+ Extensions.retryOnFailure(MaxRetryNum) {
+ future
+ } {
+ logger.error(s"fetch and deleteAll failed.")
+ (true, false)
+ }
- /** write */
- def deleteAllAdjacentEdges(srcVertices: List[Vertex], labels: Seq[Label], dir: Int, ts: Long): Future[Boolean] =
- storage.deleteAllAdjacentEdges(srcVertices, labels, dir, ts)
+ }
- def mutateElements(elements: Seq[GraphElement], withWait: Boolean = false): Future[Seq[Boolean]] =
- storage.mutateElements(elements, withWait)
+ def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepResult],
+ requestTs: Long): Future[(Boolean, Boolean)] = {
+ stepInnerResultLs.foreach { stepInnerResult =>
+ if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.")
+ }
+ val futures = for {
+ stepInnerResult <- stepInnerResultLs
+ deleteStepInnerResult = buildEdgesToDelete(stepInnerResult, requestTs)
+ if deleteStepInnerResult.edgeWithScores.nonEmpty
+ } yield {
+ val head = deleteStepInnerResult.edgeWithScores.head
+ val label = head.edge.label
+ val ret = label.schemaVersion match {
+ case HBaseType.VERSION3 | HBaseType.VERSION4 =>
+ if (label.consistencyLevel == "strong") {
+ /**
+ * read: snapshotEdge on queryResult = O(N)
+ * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge))
+ */
+ mutateEdges(deleteStepInnerResult.edgeWithScores.map(_.edge), withWait = true).map(_.forall(identity))
+ } else {
+ getStorage(label).deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum)
+ }
+ case _ =>
- def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait)
+ /**
+ * read: x
+ * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices)
+ */
+ getStorage(label).deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum)
+ }
+ ret
+ }
- def mutateVertices(vertices: Seq[Vertex], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateVertices(vertices, withWait)
+ if (futures.isEmpty) {
+ // all deleted.
+ Future.successful(true -> true)
+ } else {
+ Future.sequence(futures).map { rets => false -> rets.forall(identity) }
+ }
+ }
- def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long)]] = storage.incrementCounts(edges, withWait)
+ def buildEdgesToDelete(stepInnerResult: StepResult, requestTs: Long): StepResult = {
+ val filtered = stepInnerResult.edgeWithScores.filter { edgeWithScore =>
+ (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree
+ }
+ if (filtered.isEmpty) StepResult.Empty
+ else {
+ val head = filtered.head
+ val label = head.edge.label
+ val edgeWithScoreLs = filtered.map { edgeWithScore =>
+ val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match {
+ case "strong" =>
+ val _newPropsWithTs = edgeWithScore.edge.propsWithTs ++
+ Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion))
+ (GraphUtil.operations("delete"), requestTs, _newPropsWithTs)
+ case _ =>
+ val oldEdge = edgeWithScore.edge
+ (oldEdge.op, oldEdge.version, oldEdge.propsWithTs)
+ }
+
+ val copiedEdge =
+ edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs)
+
+ val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
+ // logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}")
+ edgeToDelete
+ }
+ //Degree edge?
+ StepResult(edgeWithScores = edgeWithScoreLs, grouped = Nil, degreeEdges = Nil, false)
+ }
+ }
+
+ // def deleteAllAdjacentEdges(srcVertices: List[Vertex], labels: Seq[Label], dir: Int, ts: Long): Future[Boolean] =
+ // storage.deleteAllAdjacentEdges(srcVertices, labels, dir, ts)
+
+ def mutateElements(elements: Seq[GraphElement],
+ withWait: Boolean = false): Future[Seq[Boolean]] = {
+
+ val edgeBuffer = ArrayBuffer[(Edge, Int)]()
+ val vertexBuffer = ArrayBuffer[(Vertex, Int)]()
+
+ elements.zipWithIndex.foreach {
+ case (e: Edge, idx: Int) => edgeBuffer.append((e, idx))
+ case (v: Vertex, idx: Int) => vertexBuffer.append((v, idx))
+ case any@_ => logger.error(s"Unknown type: ${any}")
+ }
+
+ val edgeFutureWithIdx = mutateEdges(edgeBuffer.map(_._1), withWait).map { result =>
+ edgeBuffer.map(_._2).zip(result)
+ }
+
+ val vertexFutureWithIdx = mutateVertices(vertexBuffer.map(_._1), withWait).map { result =>
+ vertexBuffer.map(_._2).zip(result)
+ }
+
+ val graphFuture = for {
+ edgesMutated <- edgeFutureWithIdx
+ verticesMutated <- vertexFutureWithIdx
+ } yield (edgesMutated ++ verticesMutated).sortBy(_._1).map(_._2)
+
+ graphFuture
+
+ }
+
+ // def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait)
+
+ def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = {
+ val edgeWithIdxs = edges.zipWithIndex
+
+ val (strongEdges, weakEdges) =
+ edgeWithIdxs.partition { case (edge, idx) =>
+ val e = edge
+ e.label.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk")
+ }
+
+ val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.label.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) =>
+ val futures = edgeWithIdxs.groupBy(_._1.label).map { case (label, edgeGroup) =>
+ val storage = getStorage(label)
+ val edges = edgeGroup.map(_._1)
+ val idxs = edgeGroup.map(_._2)
+
+ /** multiple edges with weak consistency level will be processed as batch */
+ val mutations = edges.flatMap { edge =>
+ val (_, edgeUpdate) =
+ if (edge.op == GraphUtil.operations("delete")) Edge.buildDeleteBulk(None, edge)
+ else Edge.buildOperation(None, Seq(edge))
+
+ storage.buildVertexPutsAsync(edge) ++ storage.indexedEdgeMutations(edgeUpdate) ++ storage.snapshotEdgeMutations(edgeUpdate) ++ storage.increments(edgeUpdate)
+ }
+
+ storage.writeToStorage(zkQuorum, mutations, withWait).map { ret =>
+ idxs.map(idx => idx -> ret)
+ }
+ }
+ Future.sequence(futures)
+ }
+ val (strongDeleteAll, strongEdgesAll) = strongEdges.partition { case (edge, idx) => edge.op == GraphUtil.operations("deleteAll") }
+
+ val deleteAllFutures = strongDeleteAll.map { case (edge, idx) =>
+ deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.label), edge.labelWithDir.dir, edge.ts).map(idx -> _)
+ }
+
+ val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => edge.label }.map { case (label, edgeGroup) =>
+ val edges = edgeGroup.map(_._1)
+ val idxs = edgeGroup.map(_._2)
+ val storage = getStorage(label)
+ storage.mutateStrongEdges(edges, withWait = true).map { rets =>
+ idxs.zip(rets)
+ }
+ }
+
+ for {
+ weak <- Future.sequence(weakEdgesFutures)
+ deleteAll <- Future.sequence(deleteAllFutures)
+ strong <- Future.sequence(strongEdgesFutures)
+ } yield {
+ (deleteAll ++ weak.flatten.flatten ++ strong.flatten).sortBy(_._1).map(_._2)
+ }
+ }
+
+ def mutateVertices(vertices: Seq[Vertex], withWait: Boolean = false): Future[Seq[Boolean]] = {
+ val verticesWithIdx = vertices.zipWithIndex
+ val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
+ getStorage(service).mutateVertices(vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2)))
+ }
+ Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) }
+ }
+
+ def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]] = {
+ val edgesWithIdx = edges.zipWithIndex
+ val futures = edgesWithIdx.groupBy { case (e, idx) => e.label }.map { case (label, edgeGroup) =>
+ getStorage(label).incrementCounts(edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2)))
+ }
+ Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) }
+ }
+
+ def updateDegree(edge: Edge, degreeVal: Long = 0): Future[Boolean] = {
+ val label = edge.label
+
+ val storage = getStorage(label)
+ val kvs = storage.buildDegreePuts(edge, degreeVal)
+
+ storage.writeToStorage(edge.label.service.cluster, kvs, withWait = true)
+ }
def shutdown(): Unit = {
- storage.flush()
+ flushStorage()
Model.shutdown()
}
+
+ def addEdge(srcId: Any,
+ tgtId: Any,
+ labelName: String,
+ direction: String = "out",
+ props: Map[String, Any] = Map.empty,
+ ts: Long = System.currentTimeMillis(),
+ operation: String = "insert",
+ withWait: Boolean = true): Future[Boolean] = {
+
+ val innerEdges = Seq(Edge.toEdge(srcId, tgtId, labelName, direction, props.toMap, ts, operation))
+ mutateEdges(innerEdges, withWait).map(_.headOption.getOrElse(false))
+ }
+
+ def addVertex(serviceName: String,
+ columnName: String,
+ id: Any,
+ props: Map[String, Any] = Map.empty,
+ ts: Long = System.currentTimeMillis(),
+ operation: String = "insert",
+ withWait: Boolean = true): Future[Boolean] = {
+ val innerVertices = Seq(Vertex.toVertex(serviceName, columnName, id, props.toMap, ts, operation))
+ mutateVertices(innerVertices, withWait).map(_.headOption.getOrElse(false))
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
index 2f090cf..2cc1063 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
@@ -20,6 +20,24 @@
package org.apache.s2graph.core
object GraphExceptions {
+ var fillStckTrace = true
+ class BaseException(msg : String) extends Exception(msg){
+ override def fillInStackTrace : Exception = {
+ if(fillStckTrace) super.fillInStackTrace()
+ this
+ }
+ }
+ class NoStackException(msg : String) extends Exception(msg){
+ override def fillInStackTrace : Exception = {
+ this
+ }
+ }
+
+ class NoStackCauseException(msg : String, ex: Throwable ) extends Exception(msg, ex){
+ override def fillInStackTrace : Exception = {
+ this
+ }
+ }
case class JsonParseException(msg: String) extends Exception(msg)
@@ -43,7 +61,11 @@ object GraphExceptions {
case class InvalidHTableException(msg: String) extends Exception(msg)
- case class FetchTimeoutException(msg: String) extends Exception(msg)
+ case class FetchTimeoutException(msg: String) extends NoStackException(msg)
case class DropRequestException(msg: String) extends Exception(msg)
+
+ case class FetchAllStepFailException(msg: String) extends Exception(msg)
+
+ case class AccessDeniedException(amsg: String) extends Exception(amsg)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala
index ebfee7a..939b596 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala
@@ -50,8 +50,8 @@ object GraphUtil {
val d = direction.trim().toLowerCase match {
case "directed" | "d" => Some(0)
case "undirected" | "u" => Some(2)
- case "out" => Some(0)
- case "in" => Some(1)
+ case "out" | "o" => Some(0)
+ case "in" | "i" => Some(1)
case _ => None
}
d.map(x => x.toByte)
@@ -61,8 +61,8 @@ object GraphUtil {
direction.trim().toLowerCase match {
case "directed" | "d" => 0
case "undirected" | "u" => 2
- case "out" => 0
- case "in" => 1
+ case "out" | "o" => 0
+ case "in" | "i" => 1
case _ => 2
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index 9ef7c14..89acc63 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -25,6 +25,7 @@ import org.apache.s2graph.core.mysqls._
import org.apache.s2graph.core.types.HBaseType._
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.JSONParser._
+import org.apache.s2graph.core.utils.logger
import play.api.libs.json.Reads._
import play.api.libs.json._
@@ -104,8 +105,8 @@ object Management {
}
}
- def findLabel(labelName: String): Option[Label] = {
- Label.findByName(labelName, useCache = false)
+ def findLabel(labelName: String, useCache: Boolean = false): Option[Label] = {
+ Label.findByName(labelName, useCache = useCache)
}
def deleteLabel(labelName: String) = {
@@ -117,6 +118,16 @@ object Management {
}
}
+ def markDeletedLabel(labelName: String) = {
+ Model withTx { implicit session =>
+ Label.findByName(labelName, useCache = false).foreach { label =>
+ // rename & delete_at column filled with current time
+ Label.markDeleted(label)
+ }
+ labelName
+ }
+ }
+
def addIndex(labelStr: String, indices: Seq[Index]): Try[Label] = {
Model withTx { implicit session =>
val label = Label.findByName(labelStr).getOrElse(throw LabelNotExistException(s"$labelStr not found"))
@@ -205,12 +216,12 @@ object Management {
}
- def toProps(label: Label, js: Seq[(String, JsValue)]): Seq[(Byte, InnerValLike)] = {
+ def toProps(label: Label, js: Seq[(String, JsValue)]): Seq[(LabelMeta, InnerValLike)] = {
val props = for {
(k, v) <- js
meta <- label.metaPropsInvMap.get(k)
innerVal <- jsValueToInnerVal(v, meta.dataType, label.schemaVersion)
- } yield (meta.seq, innerVal)
+ } yield (meta, innerVal)
props
}
@@ -248,15 +259,18 @@ object Management {
class Management(graph: Graph) {
import Management._
- val storage = graph.storage
- def createTable(zkAddr: String,
+ def createStorageTable(zkAddr: String,
tableName: String,
cfs: List[String],
regionMultiplier: Int,
ttl: Option[Int],
- compressionAlgorithm: String = DefaultCompressionAlgorithm): Unit =
- storage.createTable(zkAddr, tableName, cfs, regionMultiplier, ttl, compressionAlgorithm)
+ compressionAlgorithm: String = DefaultCompressionAlgorithm,
+ replicationScopeOpt: Option[Int] = None,
+ totalRegionCount: Option[Int] = None): Unit = {
+ graph.defaultStorage.createTable(zkAddr, tableName, cfs, regionMultiplier, ttl, compressionAlgorithm, replicationScopeOpt, totalRegionCount)
+ }
+
/** HBase specific code */
def createService(serviceName: String,
@@ -265,9 +279,9 @@ class Management(graph: Graph) {
compressionAlgorithm: String = DefaultCompressionAlgorithm): Try[Service] = {
Model withTx { implicit session =>
- val service = Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm)
- /* create hbase table for service */
- storage.createTable(cluster, hTableName, List("e", "v"), preSplitSize, hTableTTL, compressionAlgorithm)
+ val service = Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL.orElse(Some(Integer.MAX_VALUE)), compressionAlgorithm)
+ /** create hbase table for service */
+ graph.getStorage(service).createTable(service.cluster, service.hTableName, List("e", "v"), service.preSplitSize, service.hTableTTL, compressionAlgorithm)
service
}
}
@@ -292,35 +306,26 @@ class Management(graph: Graph) {
compressionAlgorithm: String = "gz",
options: Option[String]): Try[Label] = {
- val labelOpt = Label.findByName(label, useCache = false)
+ if (label.length > LABEL_NAME_MAX_LENGTH ) throw new LabelNameTooLongException(s"Label name ${label} too long.( max length : ${LABEL_NAME_MAX_LENGTH}} )")
+ if (hTableName.isEmpty && hTableTTL.isDefined) throw new RuntimeException("if want to specify ttl, give hbaseTableName also")
+ val labelOpt = Label.findByName(label, useCache = false)
Model withTx { implicit session =>
- labelOpt match {
- case Some(l) =>
- throw new GraphExceptions.LabelAlreadyExistException(s"Label name ${l.label} already exist.")
- case None =>
- /** create all models */
- if (label.length > LABEL_NAME_MAX_LENGTH ) throw new LabelNameTooLongException(s"Label name ${label} too long.( max length : 40 )")
- val newLabel = Label.insertAll(label,
- srcServiceName, srcColumnName, srcColumnType,
- tgtServiceName, tgtColumnName, tgtColumnType,
- isDirected, serviceName, indices, props, consistencyLevel,
- hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm, options)
-
- /* create hbase table */
- val service = newLabel.service
- (hTableName, hTableTTL) match {
- case (None, None) => // do nothing
- case (None, Some(hbaseTableTTL)) => throw new RuntimeException("if want to specify ttl, give hbaseTableName also")
- case (Some(hbaseTableName), None) =>
- // create own hbase table with default ttl on service level.
- storage.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, service.hTableTTL, compressionAlgorithm)
- case (Some(hbaseTableName), Some(hbaseTableTTL)) =>
- // create own hbase table with own ttl.
- storage.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, hTableTTL, compressionAlgorithm)
- }
- newLabel
- }
+ if (labelOpt.isDefined) throw new LabelAlreadyExistException(s"Label name ${label} already exist.")
+
+ /** create all models */
+ val newLabel = Label.insertAll(label,
+ srcServiceName, srcColumnName, srcColumnType,
+ tgtServiceName, tgtColumnName, tgtColumnType,
+ isDirected, serviceName, indices, props, consistencyLevel,
+ hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm, options)
+
+ /** create hbase table */
+ val storage = graph.getStorage(newLabel)
+ val service = newLabel.service
+ storage.createTable(service.cluster, newLabel.hbaseTableName, List("e", "v"), service.preSplitSize, newLabel.hTableTTL, newLabel.compressionAlgorithm)
+
+ newLabel
}
}
@@ -331,12 +336,11 @@ class Management(graph: Graph) {
* copy label when if oldLabel exist and newLabel do not exist.
* copy label: only used by bulk load job. not sure if we need to parameterize hbase cluster.
*/
- def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]) = {
+ def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]): Try[Label] = {
val old = Label.findByName(oldLabelName, useCache = false).getOrElse(throw new LabelNotExistException(s"Old label $oldLabelName not exists."))
- if (Label.findByName(newLabelName, useCache = false).isDefined) throw new LabelAlreadyExistException(s"New label $newLabelName already exists.")
- val allProps = old.metas.map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) }
- val allIndices = old.indices.map { index => Index(index.name, index.propNames) }
+ val allProps = old.metas(useCache = false).map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) }
+ val allIndices = old.indices(useCache = false).map { index => Index(index.name, index.propNames) }
createLabel(newLabelName, old.srcService.serviceName, old.srcColumnName, old.srcColumnType,
old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType,
@@ -344,4 +348,13 @@ class Management(graph: Graph) {
allIndices, allProps,
old.consistencyLevel, hTableName, old.hTableTTL, old.schemaVersion, old.isAsync, old.compressionAlgorithm, old.options)
}
+
+ def getCurrentStorageInfo(labelName: String): Try[Map[String, String]] = for {
+ label <- Try(Label.findByName(labelName, useCache = false).get)
+ } yield {
+ val storage = graph.getStorage(label)
+ storage.info
+ }
+
}
+
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
index 7cc2420..6c8563c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
@@ -19,12 +19,18 @@
package org.apache.s2graph.core
-import org.apache.s2graph.core.GraphExceptions.BadQueryException
+import java.util.Base64
+
+import com.google.protobuf.ByteString
+import org.apache.s2graph.core.GraphExceptions.{BadQueryException, LabelNotExistException}
import org.apache.s2graph.core.mysqls._
-import org.apache.s2graph.core.types.{InnerVal, InnerValLike}
+import org.apache.s2graph.core.types.{InnerVal, InnerValLike, InnerValLikeWithTs}
import org.apache.s2graph.core.JSONParser._
+import org.apache.s2graph.core.rest.RequestParser
+import org.apache.s2graph.core.utils.logger
import play.api.libs.json.{Json, _}
+import scala.collection.JavaConversions._
import scala.collection.immutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
@@ -40,74 +46,100 @@ object PostProcess {
* Result Entity score field name
*/
val emptyDegrees = Seq.empty[JsValue]
- val timeoutResults = Json.obj("size" -> 0, "degrees" -> Json.arr(), "results" -> Json.arr(), "isTimeout" -> true)
- val emptyResults = Json.obj("size" -> 0, "degrees" -> Json.arr(), "results" -> Json.arr(), "isEmpty" -> true)
+ val emptyResults = Json.obj("size" -> 0, "degrees" -> Json.arr(), "results" -> Json.arr(), "isEmpty" -> true, "rpcFail" -> 0)
+
def badRequestResults(ex: => Exception) = ex match {
case ex: BadQueryException => Json.obj("message" -> ex.msg)
case _ => Json.obj("message" -> ex.getMessage)
}
- val SCORE_FIELD_NAME = "scoreSum"
- val reservedColumns = Set("cacheRemain", "from", "to", "label", "direction", "_timestamp", "timestamp", "score", "props")
-
-
def s2EdgeParent(graph: Graph,
+ queryOption: QueryOption,
parentEdges: Seq[EdgeWithScore]): JsValue = {
if (parentEdges.isEmpty) JsNull
else {
val ancestors = for {
current <- parentEdges
- parents = s2EdgeParent(graph, current.edge.parentEdges) if parents != JsNull
+ parents = s2EdgeParent(graph, queryOption, current.edge.parentEdges) if parents != JsNull
} yield {
val s2Edge = current.edge.originalEdgeOpt.getOrElse(current.edge)
- s2EdgeToJsValue(s2Edge, current.score, false, parents = parents)
+ s2EdgeToJsValue(queryOption, current.copy(edge = s2Edge), false, parents = parents, checkSelectColumns = true)
}
Json.toJson(ancestors)
}
}
- def s2EdgeToJsValue(s2Edge: Edge,
- score: Double,
+ def s2EdgeToJsValue(queryOption: QueryOption,
+ edgeWithScore: EdgeWithScore,
isDegree: Boolean = false,
- parents: JsValue = JsNull): JsValue = {
+ parents: JsValue = JsNull,
+ checkSelectColumns: Boolean = false): JsValue = {
+ // val builder = immutable.Map.newBuilder[String, JsValue]
+ val builder = ArrayBuffer.empty[(String, JsValue)]
+ val s2Edge = edgeWithScore.edge
+ val score = edgeWithScore.score
+ val label = edgeWithScore.label
if (isDegree) {
- Json.obj(
- "from" -> anyValToJsValue(s2Edge.srcId),
- "label" -> s2Edge.labelName,
- LabelMeta.degree.name -> anyValToJsValue(s2Edge.propsWithTs(LabelMeta.degreeSeq).innerVal.value)
- )
+ builder += ("from" -> anyValToJsValue(s2Edge.srcId).get)
+ builder += ("label" -> anyValToJsValue(label.label).get)
+ builder += ("direction" -> anyValToJsValue(s2Edge.direction).get)
+ builder += (LabelMeta.degree.name -> anyValToJsValue(s2Edge.propsWithTs(LabelMeta.degree).innerVal.value).get)
+ JsObject(builder)
} else {
- Json.obj("from" -> anyValToJsValue(s2Edge.srcId),
- "to" -> anyValToJsValue(s2Edge.tgtId),
- "label" -> s2Edge.labelName,
- "score" -> score,
- "props" -> JSONParser.propertiesToJson(s2Edge.properties),
- "direction" -> s2Edge.direction,
- "timestamp" -> anyValToJsValue(s2Edge.ts),
- "parents" -> parents
- )
- }
- }
+ if (queryOption.withScore) builder += ("score" -> anyValToJsValue(score).get)
- def withImpressionId(queryOption: QueryOption,
- size: Int,
- degrees: Seq[JsValue],
- results: Seq[JsValue]): JsValue = {
- queryOption.impIdOpt match {
- case None => Json.obj(
- "size" -> size,
- "degrees" -> degrees,
- "results" -> results
- )
- case Some(impId) =>
- Json.obj(
- "size" -> size,
- "degrees" -> degrees,
- "results" -> results,
- Experiment.ImpressionKey -> impId
- )
+ if (queryOption.selectColumns.isEmpty) {
+ builder += ("from" -> anyValToJsValue(s2Edge.srcId).get)
+ builder += ("to" -> anyValToJsValue(s2Edge.tgtId).get)
+ builder += ("label" -> anyValToJsValue(label.label).get)
+
+ val innerProps = ArrayBuffer.empty[(String, JsValue)]
+ for {
+ (labelMeta, v) <- edgeWithScore.edge.propsWithTs
+ jsValue <- anyValToJsValue(v.innerVal.value)
+ } {
+ innerProps += (labelMeta.name -> jsValue)
+ }
+
+
+ builder += ("props" -> JsObject(innerProps))
+ builder += ("direction" -> anyValToJsValue(s2Edge.direction).get)
+ builder += ("timestamp" -> anyValToJsValue(s2Edge.tsInnerVal).get)
+ builder += ("_timestamp" -> anyValToJsValue(s2Edge.tsInnerVal).get) // backward compatibility
+ if (parents != JsNull) builder += ("parents" -> parents)
+ // Json.toJson(builder.result())
+ JsObject(builder)
+ } else {
+ queryOption.selectColumnsMap.foreach { case (columnName, _) =>
+ columnName match {
+ case "from" => builder += ("from" -> anyValToJsValue(s2Edge.srcId).get)
+ case "_from" => builder += ("_from" -> anyValToJsValue(s2Edge.srcId).get)
+ case "to" => builder += ("to" -> anyValToJsValue(s2Edge.tgtId).get)
+ case "_to" => builder += ("_to" -> anyValToJsValue(s2Edge.tgtId).get)
+ case "label" => builder += ("label" -> anyValToJsValue(label.label).get)
+ case "direction" => builder += ("direction" -> anyValToJsValue(s2Edge.direction).get)
+ case "timestamp" => builder += ("timestamp" -> anyValToJsValue(s2Edge.tsInnerVal).get)
+ case "_timestamp" => builder += ("_timestamp" -> anyValToJsValue(s2Edge.tsInnerVal).get)
+ case _ => // should not happen
+
+ }
+ }
+ val innerProps = ArrayBuffer.empty[(String, JsValue)]
+ for {
+ (labelMeta, v) <- edgeWithScore.edge.propsWithTs
+ if !checkSelectColumns || queryOption.selectColumnsMap.contains(labelMeta.name)
+ jsValue <- anyValToJsValue(v.innerVal.value)
+ } {
+ innerProps += (labelMeta.name -> jsValue)
+ }
+
+ builder += ("props" -> JsObject(innerProps))
+ if (parents != JsNull) builder += ("parents" -> parents)
+ JsObject(builder)
+ }
}
}
+
def s2VertexToJson(s2Vertex: Vertex): Option[JsValue] = {
val props = for {
(k, v) <- s2Vertex.properties
@@ -140,6 +172,7 @@ object PostProcess {
val kvs = new ArrayBuffer[(String, JsValue)]()
+
kvs.append("size" -> JsNumber(size))
kvs.append("degrees" -> JsArray(degrees))
kvs.append("results" -> JsArray(results))
@@ -149,28 +182,71 @@ object PostProcess {
JsObject(kvs)
}
- def toJson(graph: Graph,
- queryOption: QueryOption,
- stepResult: StepResult): JsValue = {
+ def buildJsonWith(js: JsValue)(implicit fn: (String, JsValue) => JsValue): JsValue = js match {
+ case JsObject(obj) => JsObject(obj.map { case (k, v) => k -> buildJsonWith(fn(k, v)) })
+ case JsArray(arr) => JsArray(arr.map(buildJsonWith(_)))
+ case _ => js
+ }
+
+ def toJson(orgQuery: Option[JsValue])(graph: Graph,
+ queryOption: QueryOption,
+ stepResult: StepResult): JsValue = {
+
+ // [[cursor, cursor], [cursor]]
+ lazy val cursors: Seq[Seq[String]] = stepResult.accumulatedCursors.map { stepCursors =>
+ stepCursors.map { cursor => new String(Base64.getEncoder.encode(cursor)) }
+ }
+
+ lazy val cursorJson: JsValue = Json.toJson(cursors)
+
+ // build nextQuery with (original query + cursors)
+ lazy val nextQuery: Option[JsValue] = {
+ if (cursors.exists { stepCursors => stepCursors.exists(_ != "") }) {
+ val cursorIter = cursors.iterator
+ orgQuery.map { query =>
+ buildJsonWith(query) { (key, js) =>
+ if (key == "step") {
+ val currentCursor = cursorIter.next
+ val res = js.as[Seq[JsObject]].toStream.zip(currentCursor).filterNot(_._2 == "").map { case (obj, cursor) =>
+ val label = (obj \ "label").as[String]
+ if (Label.findByName(label).get.schemaVersion == "v4") obj + ("cursor" -> JsString(cursor))
+ else {
+ val limit = (obj \ "limit").asOpt[Int].getOrElse(RequestParser.defaultLimit)
+ val offset = (obj \ "offset").asOpt[Int].getOrElse(0)
+ obj + ("offset" -> JsNumber(offset + limit))
+ }
+ }
+ JsArray(res)
+ } else js
+ }
+ }
+ } else Option(JsNull)
+ }
+
+ val limitOpt = queryOption.limitOpt
+ val selectColumns = queryOption.selectColumnsMap
val degrees =
- if (queryOption.returnDegree) stepResult.degreeEdges.map(t => s2EdgeToJsValue(t.s2Edge, t.score, true))
+ if (queryOption.returnDegree) stepResult.degreeEdges.map(t => s2EdgeToJsValue(queryOption, t, true, JsNull))
else emptyDegrees
if (queryOption.groupBy.keys.isEmpty) {
// no group by specified on query.
+ val results = if (limitOpt.isDefined) stepResult.edgeWithScores.take(limitOpt.get) else stepResult.edgeWithScores
+ val ls = results.map { t =>
+ val parents = if (queryOption.returnTree) s2EdgeParent(graph, queryOption, t.edge.parentEdges) else JsNull
- val ls = stepResult.results.map { t =>
- val parents = if (queryOption.returnTree) s2EdgeParent(graph, t.parentEdges) else JsNull
- s2EdgeToJsValue(t.s2Edge, t.score, false, parents)
+ s2EdgeToJsValue(queryOption, t, false, parents)
}
- withImpressionId(queryOption, ls.size, degrees, ls)
+
+ withOptionalFields(queryOption, ls.size, degrees, ls, stepResult.failCount, cursorJson, nextQuery)
} else {
+ val grouped = if (limitOpt.isDefined) stepResult.grouped.take(limitOpt.get) else stepResult.grouped
val results =
for {
- (groupByValues, (scoreSum, edges)) <- stepResult.grouped
+ (groupByValues, (scoreSum, edges)) <- grouped
} yield {
val groupByKeyValues = queryOption.groupBy.keys.zip(groupByValues).map { case (k, valueOpt) =>
k -> valueOpt.flatMap(anyValToJsValue).getOrElse(JsNull)
@@ -185,8 +261,8 @@ object PostProcess {
)
} else {
val agg = edges.map { t =>
- val parents = if (queryOption.returnTree) s2EdgeParent(graph, t.parentEdges) else JsNull
- s2EdgeToJsValue(t.s2Edge, t.score, false, parents)
+ val parents = if (queryOption.returnTree) s2EdgeParent(graph, queryOption, t.edge.parentEdges) else JsNull
+ s2EdgeToJsValue(queryOption, t, false, parents)
}
val aggJson = Json.toJson(agg)
Json.obj(
@@ -196,7 +272,8 @@ object PostProcess {
)
}
}
- withImpressionId(queryOption, results.size, degrees, results)
+
+ withOptionalFields(queryOption, results.size, degrees, results, stepResult.failCount, cursorJson, nextQuery)
}
}
-}
+}
\ No newline at end of file