You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2017/11/19 02:28:49 UTC
[06/23] incubator-s2graph git commit: move S2Graph global helpers
into PostProcess.
move S2Graph global helpers into PostProcess.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/42b7702e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/42b7702e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/42b7702e
Branch: refs/heads/master
Commit: 42b7702e5d32e32b8888067d21031b9eb13ce1c1
Parents: cdfa0c3
Author: DO YUNG YOON <st...@apache.org>
Authored: Fri Nov 3 20:48:07 2017 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Fri Nov 3 20:48:07 2017 +0900
----------------------------------------------------------------------
.../org/apache/s2graph/core/PostProcess.scala | 386 +++++++++++++++++-
.../scala/org/apache/s2graph/core/S2Graph.scala | 390 +------------------
.../apache/s2graph/core/storage/StorageIO.scala | 2 +-
3 files changed, 392 insertions(+), 386 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/42b7702e/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
index 3017749..4549d84 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
@@ -24,15 +24,19 @@ import java.util.Base64
import com.google.protobuf.ByteString
import org.apache.s2graph.core.GraphExceptions.{BadQueryException, LabelNotExistException}
import org.apache.s2graph.core.mysqls._
-import org.apache.s2graph.core.types.{InnerVal, InnerValLike, InnerValLikeWithTs}
+import org.apache.s2graph.core.types._
import org.apache.s2graph.core.JSONParser._
+import org.apache.s2graph.core.S2Graph.{FilterHashKey, HashKey}
import org.apache.s2graph.core.rest.RequestParser
import org.apache.s2graph.core.utils.logger
import play.api.libs.json.{Json, _}
+import scala.annotation.tailrec
import scala.collection.JavaConversions._
import scala.collection.immutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.concurrent.Future
+import scala.util.Random
object PostProcess {
@@ -277,4 +281,384 @@ object PostProcess {
withOptionalFields(queryOption, results.size, degrees, results, stepResult.failCount, cursorJson, nextQuery)
}
}
+
+ /** Global helper functions */
+ @tailrec
+ final def randomInt(sampleNumber: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = {
+ if (range < sampleNumber || set.size == sampleNumber) set
+ else randomInt(sampleNumber, range, set + Random.nextInt(range))
+ }
+
+ def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = {
+ if (edges.size <= n) {
+ edges
+ } else {
+ val plainEdges = if (queryRequest.queryParam.offset == 0) {
+ edges.tail
+ } else edges
+
+ val randoms = randomInt(n, plainEdges.size)
+ var samples = List.empty[EdgeWithScore]
+ var idx = 0
+ plainEdges.foreach { e =>
+ if (randoms.contains(idx)) samples = e :: samples
+ idx += 1
+ }
+ samples
+ }
+ }
+
+ def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = {
+ val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score }
+ edgeWithScores.map { edgeWithScore =>
+ edgeWithScore.copy(score = edgeWithScore.score / sum)
+ }
+ }
+
+ def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): Map[(LabelWithDirection, S2VertexLike), Boolean] = {
+ val vertices = for {
+ edgeWithScore <- edgeWithScoreLs
+ edge = edgeWithScore.edge
+ vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex
+ } yield (edge.labelWithDir, vertex) -> true
+
+ vertices.toMap
+ }
+
+ /** common methods for filter out, transform, aggregate queryResult */
+ def convertEdges(queryParam: QueryParam, edge: S2EdgeLike, nextStepOpt: Option[Step]): Seq[S2EdgeLike] = {
+ for {
+ convertedEdge <- queryParam.edgeTransformer.transform(queryParam, edge, nextStepOpt) if !edge.isDegree
+ } yield convertedEdge
+ }
+
+ def processTimeDecay(queryParam: QueryParam, edge: S2EdgeLike) = {
+ /* process time decay */
+ val tsVal = queryParam.timeDecay match {
+ case None => 1.0
+ case Some(timeDecay) =>
+ val tsVal = try {
+ val innerValWithTsOpt = edge.propertyValue(timeDecay.labelMeta.name)
+ innerValWithTsOpt.map { innerValWithTs =>
+ val innerVal = innerValWithTs.innerVal
+ timeDecay.labelMeta.dataType match {
+ case InnerVal.LONG => innerVal.value match {
+ case n: BigDecimal => n.bigDecimal.longValue()
+ case _ => innerVal.toString().toLong
+ }
+ case _ => innerVal.toString().toLong
+ }
+ } getOrElse (edge.ts)
+ } catch {
+ case e: Exception =>
+ logger.error(s"processTimeDecay error. ${edge.toLogString}", e)
+ edge.ts
+ }
+ val timeDiff = queryParam.timestamp - tsVal
+ timeDecay.decay(timeDiff)
+ }
+
+ tsVal
+ }
+
+ def processDuplicates[R](queryParam: QueryParam,
+ duplicates: Seq[(FilterHashKey, R)])(implicit ev: WithScore[R]): Seq[(FilterHashKey, R)] = {
+
+ if (queryParam.label.consistencyLevel != "strong") {
+ //TODO:
+ queryParam.duplicatePolicy match {
+ case DuplicatePolicy.First => Seq(duplicates.head)
+ case DuplicatePolicy.Raw => duplicates
+ case DuplicatePolicy.CountSum =>
+ val countSum = duplicates.size
+ val (headFilterHashKey, headEdgeWithScore) = duplicates.head
+ Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, countSum))
+ case _ =>
+ val scoreSum = duplicates.foldLeft(0.0) { case (prev, current) => prev + ev.score(current._2) }
+ val (headFilterHashKey, headEdgeWithScore) = duplicates.head
+ Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, scoreSum))
+ }
+ } else {
+ duplicates
+ }
+ }
+
+ def toHashKey(queryParam: QueryParam, edge: S2EdgeLike, isDegree: Boolean): (HashKey, FilterHashKey) = {
+ val src = edge.srcVertex.innerId.hashCode()
+ val tgt = edge.tgtVertex.innerId.hashCode()
+ val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree)
+ val filterHashKey = (src, tgt)
+
+ (hashKey, filterHashKey)
+ }
+
+ def filterEdges(q: Query,
+ stepIdx: Int,
+ queryRequests: Seq[QueryRequest],
+ queryResultLsFuture: Future[Seq[StepResult]],
+ queryParams: Seq[QueryParam],
+ alreadyVisited: Map[(LabelWithDirection, S2VertexLike), Boolean] = Map.empty,
+ buildLastStepInnerResult: Boolean = true,
+ parentEdges: Map[VertexId, Seq[EdgeWithScore]])
+ (implicit ec: scala.concurrent.ExecutionContext): Future[StepResult] = {
+
+ queryResultLsFuture.map { queryRequestWithResultLs =>
+ val (cursors, failCount) = {
+ val _cursors = ArrayBuffer.empty[Array[Byte]]
+ var _failCount = 0
+
+ queryRequestWithResultLs.foreach { stepResult =>
+ _cursors.append(stepResult.cursors: _*)
+ _failCount += stepResult.failCount
+ }
+
+ _cursors -> _failCount
+ }
+
+
+ if (queryRequestWithResultLs.isEmpty) StepResult.Empty.copy(failCount = failCount)
+ else {
+ val isLastStep = stepIdx == q.steps.size - 1
+ val queryOption = q.queryOption
+ val step = q.steps(stepIdx)
+
+ val currentStepResults = queryRequests.view.zip(queryRequestWithResultLs)
+ val shouldBuildInnerResults = !isLastStep || buildLastStepInnerResult
+ val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges)
+
+ if (shouldBuildInnerResults) {
+ val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) =>
+ edgeWithScore
+ }
+
+ /* process step group by */
+ val results = StepResult.filterOutStepGroupBy(_results, step.groupBy)
+ StepResult(edgeWithScores = results, grouped = Nil, degreeEdges = degrees, cursors = cursors, failCount = failCount)
+
+ } else {
+ val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) =>
+ val edge = edgeWithScore.edge
+ val score = edgeWithScore.score
+ val label = edgeWithScore.label
+
+ /* Select */
+ val mergedPropsWithTs = edge.propertyValuesInner(propsSelectColumns)
+
+ // val newEdge = edge.copy(propsWithTs = mergedPropsWithTs)
+ val newEdge = edge.copyEdgeWithState(mergedPropsWithTs)
+
+ val newEdgeWithScore = edgeWithScore.copy(edge = newEdge)
+ /* OrderBy */
+ val orderByValues =
+ if (queryOption.orderByKeys.isEmpty) (score, edge.tsInnerVal, None, None)
+ else StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys))
+
+ /* StepGroupBy */
+ val stepGroupByValues = newEdgeWithScore.toValues(step.groupBy.keys)
+
+ /* GroupBy */
+ val groupByValues = newEdgeWithScore.toValues(queryOption.groupBy.keys)
+
+ /* FilterOut */
+ val filterOutValues = newEdgeWithScore.toValues(queryOption.filterOutFields)
+
+ newEdgeWithScore.copy(orderByValues = orderByValues,
+ stepGroupByValues = stepGroupByValues,
+ groupByValues = groupByValues,
+ filterOutValues = filterOutValues)
+ }
+
+ /* process step group by */
+ val results = StepResult.filterOutStepGroupBy(_results, step.groupBy)
+
+ /* process ordered list */
+ val ordered = if (queryOption.groupBy.keys.isEmpty) StepResult.orderBy(queryOption, results) else Nil
+
+ /* process grouped list */
+ val grouped =
+ if (queryOption.groupBy.keys.isEmpty) Nil
+ else {
+ val agg = new scala.collection.mutable.HashMap[StepResult.GroupByKey, (Double, StepResult.Values)]()
+ results.groupBy { edgeWithScore =>
+ // edgeWithScore.groupByValues.map(_.map(_.toString))
+ edgeWithScore.groupByValues
+ }.foreach { case (k, ls) =>
+ val (scoreSum, merged) = StepResult.mergeOrdered(ls, Nil, queryOption)
+
+ val newScoreSum = scoreSum
+
+ /*
+ * watch out here. by calling toString on Any, we lose type information which will be used
+ * later for toJson.
+ */
+ if (merged.nonEmpty) {
+ val newKey = merged.head.groupByValues
+ agg += ((newKey, (newScoreSum, merged)))
+ }
+ }
+ agg.toSeq.sortBy(_._2._1 * -1)
+ }
+
+ StepResult(edgeWithScores = ordered, grouped = grouped, degreeEdges = degrees, cursors = cursors, failCount = failCount)
+ }
+ }
+ }
+ }
+
+ def toEdgeWithScores(queryRequest: QueryRequest,
+ stepResult: StepResult,
+ parentEdges: Map[VertexId, Seq[EdgeWithScore]]): Seq[EdgeWithScore] = {
+ val queryOption = queryRequest.query.queryOption
+ val queryParam = queryRequest.queryParam
+ val prevScore = queryRequest.prevStepScore
+ val labelWeight = queryRequest.labelWeight
+ val edgeWithScores = stepResult.edgeWithScores
+
+ val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent
+ val parents = if (shouldBuildParents) {
+ parentEdges.getOrElse(queryRequest.vertex.id, Nil).map { edgeWithScore =>
+ val edge = edgeWithScore.edge
+ val score = edgeWithScore.score
+ val label = edgeWithScore.label
+
+ /* Select */
+ val mergedPropsWithTs =
+ if (queryOption.selectColumns.isEmpty) {
+ edge.propertyValuesInner()
+ } else {
+ val initial = Map(LabelMeta.timestamp -> edge.propertyValueInner(LabelMeta.timestamp))
+ edge.propertyValues(queryOption.selectColumns) ++ initial
+ }
+
+ val newEdge = edge.copyEdgeWithState(mergedPropsWithTs)
+ edgeWithScore.copy(edge = newEdge)
+ }
+ } else Nil
+
+ // skip
+ if (queryOption.ignorePrevStepCache) stepResult.edgeWithScores
+ else {
+ val degreeScore = 0.0
+
+ val sampled =
+ if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
+ else edgeWithScores
+
+ val withScores = for {
+ edgeWithScore <- sampled
+ } yield {
+ val edge = edgeWithScore.edge
+ val edgeScore = edgeWithScore.score
+ val score = queryParam.scorePropagateOp match {
+ case "plus" => edgeScore + prevScore
+ case "divide" =>
+ if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0
+ else edgeScore / (prevScore + queryParam.scorePropagateShrinkage)
+ case _ => edgeScore * prevScore
+ }
+
+ val tsVal = processTimeDecay(queryParam, edge)
+ val newScore = degreeScore + score
+ // val newEdge = if (queryOption.returnTree) edge.copy(parentEdges = parents) else edge
+ val newEdge = edge.copyParentEdges(parents)
+ edgeWithScore.copy(edge = newEdge, score = newScore * labelWeight * tsVal)
+ }
+
+ val normalized =
+ if (queryParam.shouldNormalize) normalize(withScores)
+ else withScores
+
+ normalized
+ }
+ }
+
+ def buildResult[R](query: Query,
+ stepIdx: Int,
+ stepResultLs: Seq[(QueryRequest, StepResult)],
+ parentEdges: Map[VertexId, Seq[EdgeWithScore]])
+ (createFunc: (EdgeWithScore, Seq[LabelMeta]) => R)
+ (implicit ev: WithScore[R]): ListBuffer[R] = {
+ import scala.collection._
+
+ val results = ListBuffer.empty[R]
+ val sequentialLs: ListBuffer[(HashKey, FilterHashKey, R, QueryParam)] = ListBuffer.empty
+ val duplicates: mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, R)]] = mutable.HashMap.empty
+ val edgesToExclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty
+ val edgesToInclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty
+
+ var numOfDuplicates = 0
+ val queryOption = query.queryOption
+ val step = query.steps(stepIdx)
+ val excludeLabelWithDirSet = step.queryParams.filter(_.exclude).map(l => l.labelWithDir).toSet
+ val includeLabelWithDirSet = step.queryParams.filter(_.include).map(l => l.labelWithDir).toSet
+
+ stepResultLs.foreach { case (queryRequest, stepInnerResult) =>
+ val queryParam = queryRequest.queryParam
+ val label = queryParam.label
+ val shouldBeExcluded = excludeLabelWithDirSet.contains(queryParam.labelWithDir)
+ val shouldBeIncluded = includeLabelWithDirSet.contains(queryParam.labelWithDir)
+
+ val propsSelectColumns = (for {
+ column <- queryOption.propsSelectColumns
+ labelMeta <- label.metaPropsInvMap.get(column)
+ } yield labelMeta)
+
+ for {
+ edgeWithScore <- toEdgeWithScores(queryRequest, stepInnerResult, parentEdges)
+ } {
+ val edge = edgeWithScore.edge
+ val (hashKey, filterHashKey) = toHashKey(queryParam, edge, isDegree = false)
+ // params += (hashKey -> queryParam) //
+
+ /* check if this edge should be exlcuded. */
+ if (shouldBeExcluded) {
+ edgesToExclude.add(filterHashKey)
+ } else {
+ if (shouldBeIncluded) {
+ edgesToInclude.add(filterHashKey)
+ }
+ val newEdgeWithScore = createFunc(edgeWithScore, propsSelectColumns)
+
+ sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore, queryParam))
+ duplicates.get(hashKey) match {
+ case None =>
+ val newLs = ListBuffer.empty[(FilterHashKey, R)]
+ newLs += (filterHashKey -> newEdgeWithScore)
+ duplicates += (hashKey -> newLs) //
+ case Some(old) =>
+ numOfDuplicates += 1
+ old += (filterHashKey -> newEdgeWithScore) //
+ }
+ }
+ }
+ }
+
+
+ if (numOfDuplicates == 0) {
+ // no duplicates at all.
+ for {
+ (hashKey, filterHashKey, edgeWithScore, _) <- sequentialLs
+ if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
+ } {
+ results += edgeWithScore
+ }
+ } else {
+ // need to resolve duplicates.
+ val seen = new mutable.HashSet[HashKey]()
+ for {
+ (hashKey, filterHashKey, edgeWithScore, queryParam) <- sequentialLs
+ if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
+ if !seen.contains(hashKey)
+ } {
+ // val queryParam = params(hashKey)
+ processDuplicates(queryParam, duplicates(hashKey)).foreach { case (_, duplicate) =>
+ if (ev.score(duplicate) >= queryParam.threshold) {
+ seen += hashKey
+ results += duplicate
+ }
+ }
+ }
+ }
+ results
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/42b7702e/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 8bb95fc..82d0c6a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -22,6 +22,7 @@ package org.apache.s2graph.core
import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.{Executors, TimeUnit}
+
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.commons.configuration.{BaseConfiguration, Configuration}
import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
@@ -31,8 +32,9 @@ import org.apache.s2graph.core.index.IndexProvider
import org.apache.s2graph.core.io.tinkerpop.optimize.S2GraphStepStrategy
import org.apache.s2graph.core.mysqls._
import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
-import org.apache.s2graph.core.storage.{ MutateResponse, SKeyValue, Storage}
+import org.apache.s2graph.core.storage.{MutateResponse, SKeyValue, Storage}
import org.apache.s2graph.core.types._
+import org.apache.s2graph.core.PostProcess._
import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger}
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies
@@ -42,13 +44,13 @@ import org.apache.tinkerpop.gremlin.structure.Graph.{Features, Variables}
import org.apache.tinkerpop.gremlin.structure.io.{GraphReader, GraphWriter, Io, Mapper}
import org.apache.tinkerpop.gremlin.structure.{Edge, Element, Graph, T, Transaction, Vertex}
import play.api.libs.json.{JsObject, Json}
-import scala.annotation.tailrec
+
import scala.collection.JavaConversions._
import scala.collection.mutable
-import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.mutable.ArrayBuffer
import scala.concurrent._
import scala.concurrent.duration.Duration
-import scala.util.{Random, Try}
+import scala.util.Try
object S2Graph {
@@ -156,386 +158,6 @@ object S2Graph {
}
ConfigFactory.parseMap(kvs)
}
-
- /** Global helper functions */
- @tailrec
- final def randomInt(sampleNumber: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = {
- if (range < sampleNumber || set.size == sampleNumber) set
- else randomInt(sampleNumber, range, set + Random.nextInt(range))
- }
-
- def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = {
- if (edges.size <= n) {
- edges
- } else {
- val plainEdges = if (queryRequest.queryParam.offset == 0) {
- edges.tail
- } else edges
-
- val randoms = randomInt(n, plainEdges.size)
- var samples = List.empty[EdgeWithScore]
- var idx = 0
- plainEdges.foreach { e =>
- if (randoms.contains(idx)) samples = e :: samples
- idx += 1
- }
- samples
- }
- }
-
- def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = {
- val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score }
- edgeWithScores.map { edgeWithScore =>
- edgeWithScore.copy(score = edgeWithScore.score / sum)
- }
- }
-
- def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): Map[(LabelWithDirection, S2VertexLike), Boolean] = {
- val vertices = for {
- edgeWithScore <- edgeWithScoreLs
- edge = edgeWithScore.edge
- vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex
- } yield (edge.labelWithDir, vertex) -> true
-
- vertices.toMap
- }
-
- /** common methods for filter out, transform, aggregate queryResult */
- def convertEdges(queryParam: QueryParam, edge: S2EdgeLike, nextStepOpt: Option[Step]): Seq[S2EdgeLike] = {
- for {
- convertedEdge <- queryParam.edgeTransformer.transform(queryParam, edge, nextStepOpt) if !edge.isDegree
- } yield convertedEdge
- }
-
- def processTimeDecay(queryParam: QueryParam, edge: S2EdgeLike) = {
- /* process time decay */
- val tsVal = queryParam.timeDecay match {
- case None => 1.0
- case Some(timeDecay) =>
- val tsVal = try {
- val innerValWithTsOpt = edge.propertyValue(timeDecay.labelMeta.name)
- innerValWithTsOpt.map { innerValWithTs =>
- val innerVal = innerValWithTs.innerVal
- timeDecay.labelMeta.dataType match {
- case InnerVal.LONG => innerVal.value match {
- case n: BigDecimal => n.bigDecimal.longValue()
- case _ => innerVal.toString().toLong
- }
- case _ => innerVal.toString().toLong
- }
- } getOrElse (edge.ts)
- } catch {
- case e: Exception =>
- logger.error(s"processTimeDecay error. ${edge.toLogString}", e)
- edge.ts
- }
- val timeDiff = queryParam.timestamp - tsVal
- timeDecay.decay(timeDiff)
- }
-
- tsVal
- }
-
- def processDuplicates[R](queryParam: QueryParam,
- duplicates: Seq[(FilterHashKey, R)])(implicit ev: WithScore[R]): Seq[(FilterHashKey, R)] = {
-
- if (queryParam.label.consistencyLevel != "strong") {
- //TODO:
- queryParam.duplicatePolicy match {
- case DuplicatePolicy.First => Seq(duplicates.head)
- case DuplicatePolicy.Raw => duplicates
- case DuplicatePolicy.CountSum =>
- val countSum = duplicates.size
- val (headFilterHashKey, headEdgeWithScore) = duplicates.head
- Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, countSum))
- case _ =>
- val scoreSum = duplicates.foldLeft(0.0) { case (prev, current) => prev + ev.score(current._2) }
- val (headFilterHashKey, headEdgeWithScore) = duplicates.head
- Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, scoreSum))
- }
- } else {
- duplicates
- }
- }
-
- def toHashKey(queryParam: QueryParam, edge: S2EdgeLike, isDegree: Boolean): (HashKey, FilterHashKey) = {
- val src = edge.srcVertex.innerId.hashCode()
- val tgt = edge.tgtVertex.innerId.hashCode()
- val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree)
- val filterHashKey = (src, tgt)
-
- (hashKey, filterHashKey)
- }
-
- def filterEdges(q: Query,
- stepIdx: Int,
- queryRequests: Seq[QueryRequest],
- queryResultLsFuture: Future[Seq[StepResult]],
- queryParams: Seq[QueryParam],
- alreadyVisited: Map[(LabelWithDirection, S2VertexLike), Boolean] = Map.empty,
- buildLastStepInnerResult: Boolean = true,
- parentEdges: Map[VertexId, Seq[EdgeWithScore]])
- (implicit ec: scala.concurrent.ExecutionContext): Future[StepResult] = {
-
- queryResultLsFuture.map { queryRequestWithResultLs =>
- val (cursors, failCount) = {
- val _cursors = ArrayBuffer.empty[Array[Byte]]
- var _failCount = 0
-
- queryRequestWithResultLs.foreach { stepResult =>
- _cursors.append(stepResult.cursors: _*)
- _failCount += stepResult.failCount
- }
-
- _cursors -> _failCount
- }
-
-
- if (queryRequestWithResultLs.isEmpty) StepResult.Empty.copy(failCount = failCount)
- else {
- val isLastStep = stepIdx == q.steps.size - 1
- val queryOption = q.queryOption
- val step = q.steps(stepIdx)
-
- val currentStepResults = queryRequests.view.zip(queryRequestWithResultLs)
- val shouldBuildInnerResults = !isLastStep || buildLastStepInnerResult
- val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges)
-
- if (shouldBuildInnerResults) {
- val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) =>
- edgeWithScore
- }
-
- /* process step group by */
- val results = StepResult.filterOutStepGroupBy(_results, step.groupBy)
- StepResult(edgeWithScores = results, grouped = Nil, degreeEdges = degrees, cursors = cursors, failCount = failCount)
-
- } else {
- val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) =>
- val edge = edgeWithScore.edge
- val score = edgeWithScore.score
- val label = edgeWithScore.label
-
- /* Select */
- val mergedPropsWithTs = edge.propertyValuesInner(propsSelectColumns)
-
-// val newEdge = edge.copy(propsWithTs = mergedPropsWithTs)
- val newEdge = edge.copyEdgeWithState(mergedPropsWithTs)
-
- val newEdgeWithScore = edgeWithScore.copy(edge = newEdge)
- /* OrderBy */
- val orderByValues =
- if (queryOption.orderByKeys.isEmpty) (score, edge.tsInnerVal, None, None)
- else StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys))
-
- /* StepGroupBy */
- val stepGroupByValues = newEdgeWithScore.toValues(step.groupBy.keys)
-
- /* GroupBy */
- val groupByValues = newEdgeWithScore.toValues(queryOption.groupBy.keys)
-
- /* FilterOut */
- val filterOutValues = newEdgeWithScore.toValues(queryOption.filterOutFields)
-
- newEdgeWithScore.copy(orderByValues = orderByValues,
- stepGroupByValues = stepGroupByValues,
- groupByValues = groupByValues,
- filterOutValues = filterOutValues)
- }
-
- /* process step group by */
- val results = StepResult.filterOutStepGroupBy(_results, step.groupBy)
-
- /* process ordered list */
- val ordered = if (queryOption.groupBy.keys.isEmpty) StepResult.orderBy(queryOption, results) else Nil
-
- /* process grouped list */
- val grouped =
- if (queryOption.groupBy.keys.isEmpty) Nil
- else {
- val agg = new scala.collection.mutable.HashMap[StepResult.GroupByKey, (Double, StepResult.Values)]()
- results.groupBy { edgeWithScore =>
- // edgeWithScore.groupByValues.map(_.map(_.toString))
- edgeWithScore.groupByValues
- }.foreach { case (k, ls) =>
- val (scoreSum, merged) = StepResult.mergeOrdered(ls, Nil, queryOption)
-
- val newScoreSum = scoreSum
-
- /*
- * watch out here. by calling toString on Any, we lose type information which will be used
- * later for toJson.
- */
- if (merged.nonEmpty) {
- val newKey = merged.head.groupByValues
- agg += ((newKey, (newScoreSum, merged)))
- }
- }
- agg.toSeq.sortBy(_._2._1 * -1)
- }
-
- StepResult(edgeWithScores = ordered, grouped = grouped, degreeEdges = degrees, cursors = cursors, failCount = failCount)
- }
- }
- }
- }
-
- private def toEdgeWithScores(queryRequest: QueryRequest,
- stepResult: StepResult,
- parentEdges: Map[VertexId, Seq[EdgeWithScore]]): Seq[EdgeWithScore] = {
- val queryOption = queryRequest.query.queryOption
- val queryParam = queryRequest.queryParam
- val prevScore = queryRequest.prevStepScore
- val labelWeight = queryRequest.labelWeight
- val edgeWithScores = stepResult.edgeWithScores
-
- val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent
- val parents = if (shouldBuildParents) {
- parentEdges.getOrElse(queryRequest.vertex.id, Nil).map { edgeWithScore =>
- val edge = edgeWithScore.edge
- val score = edgeWithScore.score
- val label = edgeWithScore.label
-
- /* Select */
- val mergedPropsWithTs =
- if (queryOption.selectColumns.isEmpty) {
- edge.propertyValuesInner()
- } else {
- val initial = Map(LabelMeta.timestamp -> edge.propertyValueInner(LabelMeta.timestamp))
- edge.propertyValues(queryOption.selectColumns) ++ initial
- }
-
- val newEdge = edge.copyEdgeWithState(mergedPropsWithTs)
- edgeWithScore.copy(edge = newEdge)
- }
- } else Nil
-
- // skip
- if (queryOption.ignorePrevStepCache) stepResult.edgeWithScores
- else {
- val degreeScore = 0.0
-
- val sampled =
- if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
- else edgeWithScores
-
- val withScores = for {
- edgeWithScore <- sampled
- } yield {
- val edge = edgeWithScore.edge
- val edgeScore = edgeWithScore.score
- val score = queryParam.scorePropagateOp match {
- case "plus" => edgeScore + prevScore
- case "divide" =>
- if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0
- else edgeScore / (prevScore + queryParam.scorePropagateShrinkage)
- case _ => edgeScore * prevScore
- }
-
- val tsVal = processTimeDecay(queryParam, edge)
- val newScore = degreeScore + score
- // val newEdge = if (queryOption.returnTree) edge.copy(parentEdges = parents) else edge
- val newEdge = edge.copyParentEdges(parents)
- edgeWithScore.copy(edge = newEdge, score = newScore * labelWeight * tsVal)
- }
-
- val normalized =
- if (queryParam.shouldNormalize) normalize(withScores)
- else withScores
-
- normalized
- }
- }
-
- private def buildResult[R](query: Query,
- stepIdx: Int,
- stepResultLs: Seq[(QueryRequest, StepResult)],
- parentEdges: Map[VertexId, Seq[EdgeWithScore]])
- (createFunc: (EdgeWithScore, Seq[LabelMeta]) => R)
- (implicit ev: WithScore[R]): ListBuffer[R] = {
- import scala.collection._
-
- val results = ListBuffer.empty[R]
- val sequentialLs: ListBuffer[(HashKey, FilterHashKey, R, QueryParam)] = ListBuffer.empty
- val duplicates: mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, R)]] = mutable.HashMap.empty
- val edgesToExclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty
- val edgesToInclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty
-
- var numOfDuplicates = 0
- val queryOption = query.queryOption
- val step = query.steps(stepIdx)
- val excludeLabelWithDirSet = step.queryParams.filter(_.exclude).map(l => l.labelWithDir).toSet
- val includeLabelWithDirSet = step.queryParams.filter(_.include).map(l => l.labelWithDir).toSet
-
- stepResultLs.foreach { case (queryRequest, stepInnerResult) =>
- val queryParam = queryRequest.queryParam
- val label = queryParam.label
- val shouldBeExcluded = excludeLabelWithDirSet.contains(queryParam.labelWithDir)
- val shouldBeIncluded = includeLabelWithDirSet.contains(queryParam.labelWithDir)
-
- val propsSelectColumns = (for {
- column <- queryOption.propsSelectColumns
- labelMeta <- label.metaPropsInvMap.get(column)
- } yield labelMeta)
-
- for {
- edgeWithScore <- toEdgeWithScores(queryRequest, stepInnerResult, parentEdges)
- } {
- val edge = edgeWithScore.edge
- val (hashKey, filterHashKey) = toHashKey(queryParam, edge, isDegree = false)
- // params += (hashKey -> queryParam) //
-
- /* check if this edge should be exlcuded. */
- if (shouldBeExcluded) {
- edgesToExclude.add(filterHashKey)
- } else {
- if (shouldBeIncluded) {
- edgesToInclude.add(filterHashKey)
- }
- val newEdgeWithScore = createFunc(edgeWithScore, propsSelectColumns)
-
- sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore, queryParam))
- duplicates.get(hashKey) match {
- case None =>
- val newLs = ListBuffer.empty[(FilterHashKey, R)]
- newLs += (filterHashKey -> newEdgeWithScore)
- duplicates += (hashKey -> newLs) //
- case Some(old) =>
- numOfDuplicates += 1
- old += (filterHashKey -> newEdgeWithScore) //
- }
- }
- }
- }
-
-
- if (numOfDuplicates == 0) {
- // no duplicates at all.
- for {
- (hashKey, filterHashKey, edgeWithScore, _) <- sequentialLs
- if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
- } {
- results += edgeWithScore
- }
- } else {
- // need to resolve duplicates.
- val seen = new mutable.HashSet[HashKey]()
- for {
- (hashKey, filterHashKey, edgeWithScore, queryParam) <- sequentialLs
- if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
- if !seen.contains(hashKey)
- } {
- // val queryParam = params(hashKey)
- processDuplicates(queryParam, duplicates(hashKey)).foreach { case (_, duplicate) =>
- if (ev.score(duplicate) >= queryParam.threshold) {
- seen += hashKey
- results += duplicate
- }
- }
- }
- }
- results
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/42b7702e/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
index d0a59b2..3074b4e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.core.storage
import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.S2Graph.{convertEdges, normalize, processTimeDecay, sample}
+import org.apache.s2graph.core.PostProcess._
import org.apache.s2graph.core._
import org.apache.s2graph.core.mysqls.LabelMeta
import org.apache.s2graph.core.parsers.WhereParser