You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/11/16 16:01:24 UTC
[3/3] incubator-s2graph git commit: [S2GRAPH-121]: Create `Result`
class to hold traverse result edges.
[S2GRAPH-121]: Create `Result` class to hold traverse result edges.
JIRA:
[S2GRAPH-121] https://issues.apache.org/jira/browse/S2GRAPH-121
Pull Request:
Closes #96
Authors
DO YUNG YOON: steamshon@apache.org
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/8dbb9a3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/8dbb9a3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/8dbb9a3e
Branch: refs/heads/master
Commit: 8dbb9a3eeff6f412fe5defee2baffee32f174981
Parents: b590831
Author: DO YUNG YOON <st...@apache.org>
Authored: Wed Nov 16 16:56:41 2016 +0100
Committer: DO YUNG YOON <st...@apache.org>
Committed: Wed Nov 16 17:01:07 2016 +0100
----------------------------------------------------------------------
CHANGES | 2 +
.../org/apache/s2graph/core/mysqls/schema.sql | 1 +
.../scala/org/apache/s2graph/core/Edge.scala | 82 ++-
.../apache/s2graph/core/ExceptionHandler.scala | 19 +-
.../scala/org/apache/s2graph/core/Graph.scala | 232 ++++---
.../apache/s2graph/core/GraphExceptions.scala | 2 +
.../org/apache/s2graph/core/Management.scala | 69 +--
.../org/apache/s2graph/core/OrderingUtil.scala | 7 +
.../org/apache/s2graph/core/PostProcess.scala | 601 ++++---------------
.../org/apache/s2graph/core/QueryParam.scala | 75 ++-
.../org/apache/s2graph/core/QueryResult.scala | 234 +++++++-
.../scala/org/apache/s2graph/core/Vertex.scala | 74 ++-
.../apache/s2graph/core/mysqls/Experiment.scala | 3 +-
.../org/apache/s2graph/core/mysqls/Label.scala | 59 +-
.../s2graph/core/mysqls/ServiceColumn.scala | 31 +-
.../s2graph/core/parsers/WhereParser.scala | 2 +-
.../s2graph/core/rest/RequestParser.scala | 177 ++++--
.../apache/s2graph/core/rest/RestHandler.scala | 182 +++---
.../apache/s2graph/core/storage/Storage.scala | 336 ++++++-----
.../core/storage/hbase/AsynchbaseStorage.scala | 147 +++--
.../indexedge/wide/IndexEdgeSerializable.scala | 2 +-
.../org/apache/s2graph/core/utils/Logger.scala | 6 +
.../org/apache/s2graph/core/EdgeTest.scala | 5 +-
.../s2graph/core/Integrate/CrudTest.scala | 163 +++--
.../core/Integrate/IntegrateCommon.scala | 10 +-
.../s2graph/core/Integrate/QueryTest.scala | 159 ++---
.../core/Integrate/StrongLabelDeleteTest.scala | 14 +-
.../core/Integrate/VertexTestHelper.scala | 7 +-
.../core/Integrate/WeakLabelDeleteTest.scala | 9 +-
.../apache/s2graph/core/JsonParserTest.scala | 3 +-
.../apache/s2graph/core/QueryParamTest.scala | 56 +-
.../s2graph/core/TestCommonWithModels.scala | 2 +-
.../core/benchmark/JsonBenchmarkSpec.scala | 62 +-
.../core/benchmark/SamplingBenchmarkSpec.scala | 207 +++----
.../org/apache/s2graph/rest/netty/Server.scala | 96 ++-
.../apache/s2graph/rest/play/Bootstrap.scala | 4 +-
.../s2graph/rest/play/config/Config.scala | 4 +
.../controllers/ApplicationController.scala | 6 +-
.../play/controllers/CounterController.scala | 21 +-
.../rest/play/controllers/EdgeController.scala | 164 +++--
.../play/controllers/ExperimentController.scala | 5 +-
.../play/controllers/PublishController.scala | 8 -
.../rest/play/controllers/QueryController.scala | 21 +-
.../play/controllers/VertexController.scala | 18 +-
s2rest_play/conf/reference.conf | 1 +
s2rest_play/conf/routes | 7 +-
46 files changed, 1892 insertions(+), 1503 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 0c8edc3..73b3597 100644
--- a/CHANGES
+++ b/CHANGES
@@ -94,6 +94,8 @@ Release 0.1.0 - unreleased
S2GRAPH-127: Refactor ExceptionHander Object into Class (Committed by DOYUNG YOON).
+ S2GRAPH-121: Create `Result` class to hold traverse result edges (Committed by DOYUNG YOON).
+
BUG FIXES
S2GRAPH-18: Query Option "interval" is Broken.
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
----------------------------------------------------------------------
diff --git a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
index d8ee5bc..822df6c 100644
--- a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
+++ b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
@@ -96,6 +96,7 @@ CREATE TABLE `labels` (
`is_async` tinyint(4) NOT NULL default '0',
`compressionAlgorithm` varchar(64) NOT NULL DEFAULT 'lz4',
`options` text,
+ `deleted_at` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_label` (`label`),
INDEX `idx_labels_src_column_name` (`src_column_name`),
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
index 4d4afe0..97730f3 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
@@ -19,10 +19,11 @@
package org.apache.s2graph.core
+import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
+import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.utils.logger
-import org.apache.s2graph.core.JSONParser._
import play.api.libs.json.{JsNumber, Json}
import scala.collection.JavaConversions._
import scala.util.hashing.MurmurHash3
@@ -112,7 +113,7 @@ case class IndexEdge(srcVertex: Vertex,
lazy val ordersKeyMap = orders.map { case (byte, _) => byte }.toSet
lazy val metas = for ((k, v) <- props if !ordersKeyMap.contains(k)) yield k -> v.innerVal
- // lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) }
+// lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) }
//TODO:
// lazy val kvs = Graph.client.indexedEdgeSerializer(this).toKeyValues.toList
@@ -151,8 +152,23 @@ case class Edge(srcVertex: Vertex,
val schemaVer = label.schemaVersion
val ts = propsWithTs(LabelMeta.timeStampSeq).innerVal.toString.toLong
+ lazy val srcId = srcVertex.innerIdVal
+ lazy val tgtId = tgtVertex.innerIdVal
+ lazy val labelName = label.label
+ lazy val direction = GraphUtil.fromDirection(labelWithDir.dir)
+ lazy val properties = toProps()
+
def props = propsWithTs.mapValues(_.innerVal)
+
+ private def toProps(): Map[String, Any] = {
+ for {
+ (labelMeta, defaultVal) <- label.metaPropsDefaultMapInner
+ } yield {
+ labelMeta.name -> propsWithTs.getOrElse(labelMeta.seq, defaultVal).innerVal.value
+ }
+ }
+
def relatedEdges = {
if (labelWithDir.isDirected) {
val skipReverse = label.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false)
@@ -204,10 +220,10 @@ case class Edge(srcVertex: Vertex,
def isDegree = propsWithTs.contains(LabelMeta.degreeSeq)
- // def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match {
- // case Some(_) => props
- // case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer))
- // }
+// def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match {
+// case Some(_) => props
+// case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer))
+// }
def propsPlusTsValid = propsWithTs.filter(kv => kv._1 >= 0)
@@ -290,8 +306,31 @@ case class Edge(srcVertex: Vertex,
ret.mkString("\t")
}
+
+ def selectValues(selectColumns: Seq[String],
+ useToString: Boolean = true,
+ score: Double = 0.0): Seq[Option[Any]] = {
+ //TODO: Option should be matched in JsonParser anyTo*
+ for {
+ selectColumn <- selectColumns
+ } yield {
+ val valueOpt = selectColumn match {
+ case LabelMeta.from.name | "from" => Option(srcId)
+ case LabelMeta.to.name | "to" => Option(tgtId)
+ case "label" => Option(labelName)
+ case "direction" => Option(direction)
+ case "score" => Option(score)
+ case LabelMeta.timestamp.name | "timestamp" => Option(ts)
+ case _ =>
+ properties.get(selectColumn)
+ }
+ if (useToString) valueOpt.map(_.toString)
+ else valueOpt
+ }
+ }
}
+
case class EdgeMutate(edgesToDelete: List[IndexEdge] = List.empty[IndexEdge],
edgesToInsert: List[IndexEdge] = List.empty[IndexEdge],
newSnapshotEdge: Option[SnapshotEdge] = None) {
@@ -310,6 +349,33 @@ object Edge {
val incrementVersion = 1L
val minTsVal = 0L
+ def toEdge(srcId: Any,
+ tgtId: Any,
+ labelName: String,
+ direction: String,
+ props: Map[String, Any] = Map.empty,
+ ts: Long = System.currentTimeMillis(),
+ operation: String = "insert"): Edge = {
+ val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
+
+ val srcVertexId = toInnerVal(srcId.toString, label.srcColumn.columnType, label.schemaVersion)
+ val tgtVertexId = toInnerVal(tgtId.toString, label.tgtColumn.columnType, label.schemaVersion)
+
+ val srcColId = label.srcColumn.id.get
+ val tgtColId = label.tgtColumn.id.get
+
+ val srcVertex = Vertex(SourceVertexId(srcColId, srcVertexId), System.currentTimeMillis())
+ val tgtVertex = Vertex(TargetVertexId(tgtColId, tgtVertexId), System.currentTimeMillis())
+ val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
+
+ val labelWithDir = LabelWithDirection(label.id.get, dir)
+ val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts)
+ val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
+ val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
+
+ new Edge(srcVertex, tgtVertex, labelWithDir, op = op, version = ts, propsWithTs = propsWithTs)
+ }
+
/** now version information is required also **/
type State = Map[Byte, InnerValLikeWithTs]
type PropsPairWithTs = (State, State, Long, String)
@@ -387,7 +453,7 @@ object Edge {
// logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}")
// logger.error(s"$propsWithTs")
- (requestEdge, edgeMutate)
+ (requestEdge.copy(propsWithTs = propsWithTs), edgeMutate)
}
}
@@ -582,7 +648,7 @@ object Edge {
(propsWithTs, true)
}
- def fromString(s: String): Option[Edge] = Graph.toEdge(s)
+// def fromString(s: String): Option[Edge] = Graph.toEdge(s)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
index d03d483..48976d3 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
@@ -24,8 +24,11 @@ import java.util.Properties
import com.typesafe.config.Config
import org.apache.kafka.clients.producer._
import org.apache.s2graph.core.utils.logger
+import play.api.libs.json.JsValue
class ExceptionHandler(config: Config) {
+
+
import ExceptionHandler._
val keyBrokerList = "kafka.metadata.broker.list"
@@ -43,7 +46,6 @@ class ExceptionHandler(config: Config) {
}
} else None
-
def enqueue(m: KafkaMessage): Unit = {
producer match {
case None => logger.debug(s"skip log to Kafka: ${m}")
@@ -68,24 +70,33 @@ object ExceptionHandler {
type Key = String
type Val = String
- def toKafkaMessage(topic: String, element: GraphElement, originalString: Option[String] = None) = {
+ def toKafkaMessage(topic: String,
+ element: GraphElement,
+ originalString: Option[String] = None,
+ produceJson: Boolean = false) = {
+ val edgeString = originalString.getOrElse(element.toLogString())
+ val msg = edgeString
+
KafkaMessage(
new ProducerRecord[Key, Val](
topic,
element.queuePartitionKey,
- originalString.getOrElse(element.toLogString())))
+ msg))
}
+ // only used in deleteAll
def toKafkaMessage(topic: String, tsv: String) = {
KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv))
}
+ def toKafkaMessage(topic: String, jsValue: JsValue): KafkaMessage = toKafkaMessage(topic, jsValue.toString())
+
case class KafkaMessage(msg: ProducerRecord[Key, Val])
private def toKafkaProp(config: Config) = {
val props = new Properties()
- /* all default configuration for new producer */
+ /** all default configuration for new producer */
val brokers =
if (config.hasPath("kafka.metadata.broker.list")) config.getString("kafka.metadata.broker.list")
else "localhost"
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
index c25b71a..2cbddea 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
@@ -23,11 +23,13 @@ import java.util
import java.util.concurrent.ConcurrentHashMap
import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core.mysqls.{Label, Model}
import org.apache.s2graph.core.parsers.WhereParser
import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
import org.apache.s2graph.core.types.{InnerVal, LabelWithDirection}
import org.apache.s2graph.core.utils.logger
+import play.api.libs.json.{JsObject, Json}
import scala.collection.JavaConversions._
import scala.collection._
@@ -58,6 +60,7 @@ object Graph {
"back.off.timeout" -> java.lang.Integer.valueOf(1000),
"hbase.fail.prob" -> java.lang.Double.valueOf(-0.1),
"delete.all.fetch.size" -> java.lang.Integer.valueOf(1000),
+ "delete.all.fetch.count" -> java.lang.Integer.valueOf(200),
"future.cache.max.size" -> java.lang.Integer.valueOf(100000),
"future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
"future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
@@ -84,10 +87,9 @@ object Graph {
(hashKey, filterHashKey)
}
- def alreadyVisitedVertices(queryResultLs: Seq[QueryResult]): Map[(LabelWithDirection, Vertex), Boolean] = {
+ def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): Map[(LabelWithDirection, Vertex), Boolean] = {
val vertices = for {
- queryResult <- queryResultLs
- edgeWithScore <- queryResult.edgeWithScoreLs
+ edgeWithScore <- edgeWithScoreLs
edge = edgeWithScore.edge
vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex
} yield (edge.labelWithDir, vertex) -> true
@@ -132,81 +134,36 @@ object Graph {
tsVal
}
- def aggregateScore(newScore: Double,
- resultEdges: ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)],
- duplicateEdges: ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]],
- edgeWithScoreSorted: ListBuffer[(HashKey, FilterHashKey, Edge, Double)],
- hashKey: HashKey,
- filterHashKey: FilterHashKey,
- queryParam: QueryParam,
- convertedEdge: Edge) = {
-
- /* skip duplicate policy check if consistencyLevel is strong */
- if (queryParam.label.consistencyLevel != "strong" && resultEdges.containsKey(hashKey)) {
- val (oldFilterHashKey, oldEdge, oldScore) = resultEdges.get(hashKey)
+ def processDuplicates(queryParam: QueryParam,
+ duplicates: Seq[(FilterHashKey, EdgeWithScore)]): Seq[(FilterHashKey, EdgeWithScore)] = {
+
+ if (queryParam.label.consistencyLevel != "strong") {
//TODO:
queryParam.duplicatePolicy match {
- case Query.DuplicatePolicy.First => // do nothing
- case Query.DuplicatePolicy.Raw =>
- if (duplicateEdges.containsKey(hashKey)) {
- duplicateEdges.get(hashKey).append(convertedEdge -> newScore)
- } else {
- val newBuffer = new ListBuffer[(Edge, Double)]
- newBuffer.append(convertedEdge -> newScore)
- duplicateEdges.put(hashKey, newBuffer)
- }
+ case Query.DuplicatePolicy.First => Seq(duplicates.head)
+ case Query.DuplicatePolicy.Raw => duplicates
case Query.DuplicatePolicy.CountSum =>
- resultEdges.put(hashKey, (filterHashKey, oldEdge, oldScore + 1))
+ val countSum = duplicates.size
+ Seq(duplicates.head._1 -> duplicates.head._2.copy(score = countSum))
case _ =>
- resultEdges.put(hashKey, (filterHashKey, oldEdge, oldScore + newScore))
+ val scoreSum = duplicates.foldLeft(0.0) { case (prev, current) => prev + current._2.score }
+ Seq(duplicates.head._1 -> duplicates.head._2.copy(score = scoreSum))
}
} else {
- resultEdges.put(hashKey, (filterHashKey, convertedEdge, newScore))
- edgeWithScoreSorted.append((hashKey, filterHashKey, convertedEdge, newScore))
+ duplicates
}
}
-
- def aggregateResults(queryRequestWithResult: QueryRequestWithResult,
- queryParamResult: Result,
- edgesToInclude: util.HashSet[FilterHashKey],
- edgesToExclude: util.HashSet[FilterHashKey]): QueryRequestWithResult = {
- val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
- val (query, stepIdx, _, queryParam) = QueryRequest.unapply(queryRequest).get
-
- val (duplicateEdges, resultEdges, edgeWithScoreSorted) = queryParamResult
- val edgesWithScores = for {
- (hashKey, filterHashKey, edge, _) <- edgeWithScoreSorted if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
- score = resultEdges.get(hashKey)._3
- (duplicateEdge, aggregatedScore) <- fetchDuplicatedEdges(edge, score, hashKey, duplicateEdges) if aggregatedScore >= queryParam.threshold
- } yield EdgeWithScore(duplicateEdge, aggregatedScore)
-
- QueryRequestWithResult(queryRequest, QueryResult(edgesWithScores))
- }
-
- def fetchDuplicatedEdges(edge: Edge,
- score: Double,
- hashKey: HashKey,
- duplicateEdges: ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]]) = {
- (edge -> score) +: (if (duplicateEdges.containsKey(hashKey)) duplicateEdges.get(hashKey) else Seq.empty)
- }
-
- def queryResultWithFilter(queryRequestWithResult: QueryRequestWithResult) = {
- val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
- val (_, _, _, queryParam) = QueryRequest.unapply(queryRequest).get
- val whereFilter = queryParam.where.get
- if (whereFilter == WhereParser.success) queryResult.edgeWithScoreLs
- else queryResult.edgeWithScoreLs.withFilter(edgeWithScore => whereFilter.filter(edgeWithScore.edge))
- }
-
- def filterEdges(queryResultLsFuture: Future[Seq[QueryRequestWithResult]],
+ def filterEdges(q: Query,
+ stepIdx: Int,
+ queryRequests: Seq[QueryRequest],
+ queryResultLsFuture: Future[Seq[StepInnerResult]],
+ queryParams: Seq[QueryParam],
alreadyVisited: Map[(LabelWithDirection, Vertex), Boolean] = Map.empty[(LabelWithDirection, Vertex), Boolean])
- (implicit ec: scala.concurrent.ExecutionContext): Future[Seq[QueryRequestWithResult]] = {
+ (implicit ec: scala.concurrent.ExecutionContext): Future[StepInnerResult] = {
queryResultLsFuture.map { queryRequestWithResultLs =>
- if (queryRequestWithResultLs.isEmpty) Nil
+ if (queryRequestWithResultLs.isEmpty) StepInnerResult.Empty
else {
- val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResultLs.head).get
- val (q, stepIdx, srcVertex, queryParam) = QueryRequest.unapply(queryRequest).get
val step = q.steps(stepIdx)
val nextStepOpt = if (stepIdx < q.steps.size - 1) Option(q.steps(stepIdx + 1)) else None
@@ -219,73 +176,114 @@ object Graph {
val edgesToExclude = new util.HashSet[FilterHashKey]()
val edgesToInclude = new util.HashSet[FilterHashKey]()
- val queryParamResultLs = new ListBuffer[Result]
- queryRequestWithResultLs.foreach { queryRequestWithResult =>
- val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
+ val sequentialLs = new ListBuffer[(HashKey, FilterHashKey, EdgeWithScore)]()
+ val agg = new mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, EdgeWithScore)]]()
+ val params = new mutable.HashMap[HashKey, QueryParam]()
+ var numOfDuplicates = 0
+ var numOfTotal = 0
+ queryRequests.zip(queryRequestWithResultLs).foreach { case (queryRequest, stepInnerResult) =>
val queryParam = queryRequest.queryParam
- val duplicateEdges = new util.concurrent.ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]]()
- val resultEdges = new util.concurrent.ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)]()
- val edgeWithScoreSorted = new ListBuffer[(HashKey, FilterHashKey, Edge, Double)]
val labelWeight = step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0)
-
- // store degree value with Array.empty so if degree edge exist, it comes at very first.
- def checkDegree() = queryResult.edgeWithScoreLs.headOption.exists { edgeWithScore =>
- edgeWithScore.edge.isDegree
- }
- var isDegree = checkDegree()
-
val includeExcludeKey = queryParam.labelWithDir.labelId -> queryParam.labelWithDir.dir
val shouldBeExcluded = excludeLabelWithDirSet.contains(includeExcludeKey)
val shouldBeIncluded = includeLabelWithDirSet.contains(includeExcludeKey)
-
- queryResultWithFilter(queryRequestWithResult).foreach { edgeWithScore =>
- val (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+ val where = queryParam.where.get
+
+ for {
+ edgeWithScore <- stepInnerResult.edgesWithScoreLs
+ if where == WhereParser.success || where.filter(edgeWithScore.edge)
+ (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+ } {
+ numOfTotal += 1
if (queryParam.transformer.isDefault) {
val convertedEdge = edge
- val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree)
+ val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree = false)
- /* check if this edge should be exlcuded. */
- if (shouldBeExcluded && !isDegree) {
+ /** check if this edge should be exlcuded. */
+ if (shouldBeExcluded) {
edgesToExclude.add(filterHashKey)
} else {
- if (shouldBeIncluded && !isDegree) {
+ if (shouldBeIncluded) {
edgesToInclude.add(filterHashKey)
}
val tsVal = processTimeDecay(queryParam, convertedEdge)
val newScore = labelWeight * score * tsVal
- aggregateScore(newScore, resultEdges, duplicateEdges, edgeWithScoreSorted, hashKey, filterHashKey, queryParam, convertedEdge)
+ val newEdgeWithScore = EdgeWithScore(convertedEdge, newScore)
+ sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore))
+ agg.get(hashKey) match {
+ case None =>
+ val newLs = new ListBuffer[(FilterHashKey, EdgeWithScore)]()
+ newLs += (filterHashKey -> newEdgeWithScore)
+ agg += (hashKey -> newLs)
+ case Some(old) =>
+ numOfDuplicates += 1
+ old += (filterHashKey -> newEdgeWithScore)
+ }
+ params += (hashKey -> queryParam)
}
} else {
convertEdges(queryParam, edge, nextStepOpt).foreach { convertedEdge =>
- val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree)
+ val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree = false)
- /* check if this edge should be exlcuded. */
- if (shouldBeExcluded && !isDegree) {
+ /** check if this edge should be exlcuded. */
+ if (shouldBeExcluded) {
edgesToExclude.add(filterHashKey)
} else {
- if (shouldBeIncluded && !isDegree) {
+ if (shouldBeIncluded) {
edgesToInclude.add(filterHashKey)
}
val tsVal = processTimeDecay(queryParam, convertedEdge)
val newScore = labelWeight * score * tsVal
- aggregateScore(newScore, resultEdges, duplicateEdges, edgeWithScoreSorted, hashKey, filterHashKey, queryParam, convertedEdge)
+ val newEdgeWithScore = EdgeWithScore(convertedEdge, newScore)
+ sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore))
+ agg.get(hashKey) match {
+ case None =>
+ val newLs = new ListBuffer[(FilterHashKey, EdgeWithScore)]()
+ newLs += (filterHashKey -> newEdgeWithScore)
+ agg += (hashKey -> newLs)
+ case Some(old) =>
+ numOfDuplicates += 1
+ old += (filterHashKey -> newEdgeWithScore)
+ }
+ params += (hashKey -> queryParam)
}
}
}
- isDegree = false
}
- val ret = (duplicateEdges, resultEdges, edgeWithScoreSorted)
- queryParamResultLs.append(ret)
}
- val aggregatedResults = for {
- (queryRequestWithResult, queryParamResult) <- queryRequestWithResultLs.zip(queryParamResultLs)
- } yield {
- aggregateResults(queryRequestWithResult, queryParamResult, edgesToInclude, edgesToExclude)
+
+ val edgeWithScoreLs = new ListBuffer[EdgeWithScore]()
+ if (numOfDuplicates == 0) {
+ // no duplicates at all.
+ for {
+ (hashKey, filterHashKey, edgeWithScore) <- sequentialLs
+ if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
+ } {
+ edgeWithScoreLs += edgeWithScore
}
+ } else {
+ // need to resolve duplicates.
+ val seen = new mutable.HashSet[HashKey]()
+ for {
+ (hashKey, filterHashKey, edgeWithScore) <- sequentialLs
+ if !seen.contains(hashKey)
+ if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
+ } {
+ val queryParam = params(hashKey)
+ val duplicates = processDuplicates(queryParam, agg(hashKey))
+ duplicates.foreach { case (_, duplicate) =>
+ if (duplicate.score >= queryParam.threshold) {
+ seen += hashKey
+ edgeWithScoreLs += duplicate
+ }
+ }
+ }
+ }
- aggregatedResults
+ val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges)
+ StepInnerResult(edgesWithScoreLs = edgeWithScoreLs, degreeEdges = degrees)
}
}
}
@@ -310,7 +308,7 @@ object Graph {
element
} recover {
case e: Exception =>
- logger.error(s"$e", e)
+ logger.error(s"[toElement]: $e", e)
None
} get
@@ -323,37 +321,33 @@ object Graph {
toEdge(GraphUtil.split(s))
}
- //"1418342849000\tu\te\t3286249\t71770\ttalk_friend\t{\"is_hidden\":false}"
- //{"from":1,"to":101,"label":"graph_test","props":{"time":-1, "weight":10},"timestamp":1417616431},
def toEdge(parts: Array[String]): Option[Edge] = Try {
val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
- val props = if (parts.length >= 7) parts(6) else "{}"
+ val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
val tempDirection = if (parts.length >= 8) parts(7) else "out"
val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection
-
- val edge = Management.toEdge(ts.toLong, operation, srcId, tgtId, label, direction, props)
- // logger.debug(s"toEdge: $edge")
- Some(edge)
+ val edge = Edge.toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation)
+ Option(edge)
} recover {
case e: Exception =>
- logger.error(s"toEdge: $e", e)
+ logger.error(s"[toEdge]: $e", e)
throw e
} get
- //"1418342850000\ti\tv\t168756793\ttalk_user_id\t{\"country_iso\":\"KR\"}"
def toVertex(parts: Array[String]): Option[Vertex] = Try {
val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
- val props = if (parts.length >= 7) parts(6) else "{}"
- Some(Management.toVertex(ts.toLong, operation, srcId, serviceName, colName, props))
+ val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
+ val vertex = Vertex.toVertex(serviceName, colName, srcId, props, ts.toLong, operation)
+ Option(vertex)
} recover {
case e: Throwable =>
- logger.error(s"toVertex: $e", e)
+ logger.error(s"[toVertex]: $e", e)
throw e
} get
- def initStorage(config: Config)(ec: ExecutionContext) = {
+ def initStorage(graph: Graph, config: Config)(ec: ExecutionContext) = {
config.getString("s2graph.storage.backend") match {
- case "hbase" => new AsynchbaseStorage(config)(ec)
+ case "hbase" => new AsynchbaseStorage(graph, config)(ec)
case _ => throw new RuntimeException("not supported storage.")
}
}
@@ -366,7 +360,7 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) {
Model.loadCache()
// TODO: Make storage client by config param
- val storage = Graph.initStorage(config)(ec)
+ val storage = Graph.initStorage(this, config)(ec)
for {
@@ -375,9 +369,11 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) {
} logger.info(s"[Initialized]: $k, ${this.config.getAnyRef(k)}")
/** select */
- def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[Seq[QueryRequestWithResult]] = storage.checkEdges(params)
+ def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[StepResult] = storage.checkEdges(params)
+
+ def getEdges(q: Query): Future[StepResult] = storage.getEdges(q)
- def getEdges(q: Query): Future[Seq[QueryRequestWithResult]] = storage.getEdges(q)
+ def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = storage.getEdgesMultiQuery(mq)
def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = storage.getVertices(vertices)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
index 0898ffa..2f090cf 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
@@ -31,6 +31,8 @@ object GraphExceptions {
case class LabelAlreadyExistException(msg: String) extends Exception(msg)
+ case class LabelNameTooLongException(msg: String) extends Exception(msg)
+
case class InternalException(msg: String) extends Exception(msg)
case class IllegalDataTypeException(msg: String) extends Exception(msg)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index 39020c7..9ef7c14 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -19,7 +19,7 @@
package org.apache.s2graph.core
-import org.apache.s2graph.core.GraphExceptions.{InvalidHTableException, LabelAlreadyExistException, LabelNotExistException}
+import org.apache.s2graph.core.GraphExceptions.{LabelNameTooLongException, InvalidHTableException, LabelAlreadyExistException, LabelNotExistException}
import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
import org.apache.s2graph.core.mysqls._
import org.apache.s2graph.core.types.HBaseType._
@@ -48,9 +48,9 @@ object Management {
import HBaseType._
+ val LABEL_NAME_MAX_LENGTH = 100
val DefaultCompressionAlgorithm = "gz"
-
def findService(serviceName: String) = {
Service.findByName(serviceName, useCache = false)
}
@@ -190,62 +190,6 @@ object Management {
}
}
- def toEdge(ts: Long, operation: String, srcId: String, tgtId: String,
- labelStr: String, direction: String = "", props: String): Edge = {
-
- val label = tryOption(labelStr, getServiceLabel)
- val dir =
- if (direction == "")
-// GraphUtil.toDirection(label.direction)
- GraphUtil.directions("out")
- else
- GraphUtil.toDirection(direction)
-
- // logger.debug(s"$srcId, ${label.srcColumnWithDir(dir)}")
- // logger.debug(s"$tgtId, ${label.tgtColumnWithDir(dir)}")
-
- val srcVertexId = toInnerVal(srcId, label.srcColumn.columnType, label.schemaVersion)
- val tgtVertexId = toInnerVal(tgtId, label.tgtColumn.columnType, label.schemaVersion)
-
- val srcColId = label.srcColumn.id.get
- val tgtColId = label.tgtColumn.id.get
- val (srcVertex, tgtVertex) = if (dir == GraphUtil.directions("out")) {
- (Vertex(SourceVertexId(srcColId, srcVertexId), System.currentTimeMillis()),
- Vertex(TargetVertexId(tgtColId, tgtVertexId), System.currentTimeMillis()))
- } else {
- (Vertex(SourceVertexId(tgtColId, tgtVertexId), System.currentTimeMillis()),
- Vertex(TargetVertexId(srcColId, srcVertexId), System.currentTimeMillis()))
- }
-
- // val dir = if (direction == "") GraphUtil.toDirection(label.direction) else GraphUtil.toDirection(direction)
- val labelWithDir = LabelWithDirection(label.id.get, dir)
- val op = tryOption(operation, GraphUtil.toOp)
-
- val jsObject = Json.parse(props).asOpt[JsObject].getOrElse(Json.obj())
- val parsedProps = toProps(label, jsObject.fields).toMap
- val propsWithTs = parsedProps.map(kv => (kv._1 -> InnerValLikeWithTs(kv._2, ts))) ++
- Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(ts, label.schemaVersion), ts))
-
- Edge(srcVertex, tgtVertex, labelWithDir, op, version = ts, propsWithTs = propsWithTs)
-
- }
-
- def toVertex(ts: Long, operation: String, id: String, serviceName: String, columnName: String, props: String): Vertex = {
- Service.findByName(serviceName) match {
- case None => throw new RuntimeException(s"$serviceName does not exist. create service first.")
- case Some(service) =>
- ServiceColumn.find(service.id.get, columnName) match {
- case None => throw new RuntimeException(s"$columnName is not exist. create service column first.")
- case Some(col) =>
- val idVal = toInnerVal(id, col.columnType, col.schemaVersion)
- val op = tryOption(operation, GraphUtil.toOp)
- val jsObject = Json.parse(props).asOpt[JsObject].getOrElse(Json.obj())
- val parsedProps = toProps(col, jsObject).toMap
- Vertex(VertexId(col.id.get, idVal), ts, parsedProps, op = op)
- }
- }
- }
-
def toProps(column: ServiceColumn, js: JsObject): Seq[(Int, InnerValLike)] = {
val props = for {
@@ -346,7 +290,7 @@ class Management(graph: Graph) {
schemaVersion: String = DEFAULT_VERSION,
isAsync: Boolean,
compressionAlgorithm: String = "gz",
- options: Option[String] = None): Try[Label] = {
+ options: Option[String]): Try[Label] = {
val labelOpt = Label.findByName(label, useCache = false)
@@ -355,7 +299,8 @@ class Management(graph: Graph) {
case Some(l) =>
throw new GraphExceptions.LabelAlreadyExistException(s"Label name ${l.label} already exist.")
case None =>
- /* create all models */
+ /** create all models */
+ if (label.length > LABEL_NAME_MAX_LENGTH ) throw new LabelNameTooLongException(s"Label name ${label} too long.( max length : 40 )")
val newLabel = Label.insertAll(label,
srcServiceName, srcColumnName, srcColumnType,
tgtServiceName, tgtColumnName, tgtColumnType,
@@ -387,8 +332,8 @@ class Management(graph: Graph) {
* copy label: only used by bulk load job. not sure if we need to parameterize hbase cluster.
*/
def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]) = {
- val old = Label.findByName(oldLabelName).getOrElse(throw new LabelAlreadyExistException(s"Old label $oldLabelName not exists."))
- if (Label.findByName(newLabelName).isDefined) throw new LabelAlreadyExistException(s"New label $newLabelName already exists.")
+ val old = Label.findByName(oldLabelName, useCache = false).getOrElse(throw new LabelNotExistException(s"Old label $oldLabelName not exists."))
+ if (Label.findByName(newLabelName, useCache = false).isDefined) throw new LabelAlreadyExistException(s"New label $newLabelName already exists.")
val allProps = old.metas.map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) }
val allIndices = old.indices.map { index => Index(index.name, index.propNames) }
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala b/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala
index 0ecbf4e..eaadb39 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala
@@ -49,8 +49,15 @@ object OrderingUtil {
case (xv: Long, yv: Long) => implicitly[Ordering[Long]].compare(xv, yv)
case (xv: Double, yv: Double) => implicitly[Ordering[Double]].compare(xv, yv)
case (xv: String, yv: String) => implicitly[Ordering[String]].compare(xv, yv)
+ case (xv: BigDecimal, yv: BigDecimal) => implicitly[Ordering[BigDecimal]].compare(xv, yv)
case (xv: JsValue, yv: JsValue) => implicitly[Ordering[JsValue]].compare(xv, yv)
case (xv: InnerValLike, yv: InnerValLike) => implicitly[Ordering[InnerValLike]].compare(xv, yv)
+ case (xv: BigDecimal, yv: Long) => implicitly[Ordering[BigDecimal]].compare(xv, BigDecimal(yv))
+ case (xv: Long, yv: BigDecimal) => implicitly[Ordering[BigDecimal]].compare(BigDecimal(xv), yv)
+ case (xv: BigDecimal, yv: Int) => implicitly[Ordering[BigDecimal]].compare(xv, BigDecimal(yv))
+ case (xv: Int, yv: BigDecimal) => implicitly[Ordering[BigDecimal]].compare(BigDecimal(xv), yv)
+ case (xv: BigDecimal, yv: Double) => implicitly[Ordering[BigDecimal]].compare(xv, BigDecimal(yv))
+ case (xv: Double, yv: BigDecimal) => implicitly[Ordering[BigDecimal]].compare(BigDecimal(xv), yv)
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
index 9ab08b3..7cc2420 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
@@ -20,11 +20,13 @@
package org.apache.s2graph.core
import org.apache.s2graph.core.GraphExceptions.BadQueryException
-import org.apache.s2graph.core.mysqls.{ColumnMeta, Label, LabelMeta, ServiceColumn}
+import org.apache.s2graph.core.mysqls._
import org.apache.s2graph.core.types.{InnerVal, InnerValLike}
-import play.api.libs.json.{Json, _}
import org.apache.s2graph.core.JSONParser._
-import scala.collection.mutable.ListBuffer
+import play.api.libs.json.{Json, _}
+
+import scala.collection.immutable
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
object PostProcess {
@@ -32,6 +34,7 @@ object PostProcess {
type EDGE_VALUES = Map[String, JsValue]
type ORDER_BY_VALUES = (Any, Any, Any, Any)
type RAW_EDGE = (EDGE_VALUES, Double, ORDER_BY_VALUES)
+ type GROUP_BY_KEY = Map[String, JsValue]
/**
* Result Entity score field name
@@ -47,523 +50,153 @@ object PostProcess {
val SCORE_FIELD_NAME = "scoreSum"
val reservedColumns = Set("cacheRemain", "from", "to", "label", "direction", "_timestamp", "timestamp", "score", "props")
- def groupEdgeResult(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = {
- val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap
- // filterNot {case (edge, score) => edge.props.contains(LabelMeta.degreeSeq)}
- val groupedEdgesWithRank = (for {
- queryRequestWithResult <- queryRequestWithResultLs
- (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
- edgeWithScore <- queryResult.edgeWithScoreLs
- (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
- if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, queryRequest.query.filterOutFields))
- } yield {
- (queryRequest.queryParam, edge, score)
- }).groupBy {
- case (queryParam, edge, rank) if edge.labelWithDir.dir == GraphUtil.directions("in") =>
- (queryParam.label.srcColumn, queryParam.label.label, queryParam.label.tgtColumn, edge.tgtVertex.innerId, edge.isDegree)
- case (queryParam, edge, rank) =>
- (queryParam.label.tgtColumn, queryParam.label.label, queryParam.label.srcColumn, edge.tgtVertex.innerId, edge.isDegree)
- }
- val ret = for {
- ((tgtColumn, labelName, srcColumn, target, isDegreeEdge), edgesAndRanks) <- groupedEdgesWithRank if !isDegreeEdge
- edgesWithRanks = edgesAndRanks.groupBy(x => x._2.srcVertex).map(_._2.head)
- id <- innerValToJsValue(target, tgtColumn.columnType)
- } yield {
- Json.obj("name" -> tgtColumn.columnName, "id" -> id,
- SCORE_FIELD_NAME -> edgesWithRanks.map(_._3).sum,
- "label" -> labelName,
- "aggr" -> Json.obj(
- "name" -> srcColumn.columnName,
- "ids" -> edgesWithRanks.flatMap { case (queryParam, edge, rank) =>
- innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType)
- },
- "edges" -> edgesWithRanks.map { case (queryParam, edge, rank) =>
- Json.obj("id" -> innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType),
- "props" -> propsToJson(edge),
- "score" -> rank
- )
- }
- )
- )
- }
-
- ret.toList
- }
-
- def sortWithFormatted(jsons: Seq[JsObject],
- scoreField: String = "scoreSum",
- queryRequestWithResultLs: Seq[QueryRequestWithResult],
- decrease: Boolean = true): JsObject = {
- val ordering = if (decrease) -1 else 1
- val sortedJsons = jsons.sortBy { jsObject => (jsObject \ scoreField).as[Double] * ordering }
-
- if (queryRequestWithResultLs.isEmpty) Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons)
- else Json.obj(
- "size" -> sortedJsons.size,
- "results" -> sortedJsons,
- "impressionId" -> queryRequestWithResultLs.head.queryRequest.query.impressionId()
- )
- }
-
- private def toHashKey(edge: Edge, queryParam: QueryParam, fields: Seq[String], delimiter: String = ","): Int = {
- val ls = for {
- field <- fields
- } yield {
- field match {
- case "from" | "_from" => edge.srcVertex.innerId.toIdString()
- case "to" | "_to" => edge.tgtVertex.innerId.toIdString()
- case "label" => edge.labelWithDir.labelId
- case "direction" => JsString(GraphUtil.fromDirection(edge.labelWithDir.dir))
- case "_timestamp" | "timestamp" => edge.ts
- case _ =>
- queryParam.label.metaPropsInvMap.get(field) match {
- case None => throw new RuntimeException(s"unknow column: $field")
- case Some(labelMeta) => edge.propsWithTs.get(labelMeta.seq) match {
- case None => labelMeta.defaultValue
- case Some(propVal) => propVal
- }
- }
- }
+ def s2EdgeParent(graph: Graph,
+ parentEdges: Seq[EdgeWithScore]): JsValue = {
+ if (parentEdges.isEmpty) JsNull
+ else {
+ val ancestors = for {
+ current <- parentEdges
+ parents = s2EdgeParent(graph, current.edge.parentEdges) if parents != JsNull
+ } yield {
+ val s2Edge = current.edge.originalEdgeOpt.getOrElse(current.edge)
+ s2EdgeToJsValue(s2Edge, current.score, false, parents = parents)
+ }
+ Json.toJson(ancestors)
}
- val ret = ls.hashCode()
- ret
- }
-
- def resultInnerIds(queryRequestWithResultLs: Seq[QueryRequestWithResult], isSrcVertex: Boolean = false): Seq[Int] = {
- for {
- queryRequestWithResult <- queryRequestWithResultLs
- (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
- q = queryRequest.query
- edgeWithScore <- queryResult.edgeWithScoreLs
- (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
- } yield toHashKey(edge, queryRequest.queryParam, q.filterOutFields)
- }
-
- def summarizeWithListExcludeFormatted(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = {
- val jsons = groupEdgeResult(queryRequestWithResultLs, exclude)
- sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs, decrease = true)
}
- def summarizeWithList(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = {
- val jsons = groupEdgeResult(queryRequestWithResultLs, exclude)
- sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs)
- }
-
- def summarizeWithListFormatted(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = {
- val jsons = groupEdgeResult(queryRequestWithResultLs, exclude)
- sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs)
- }
-
- def toSimpleVertexArrJson(queryRequestWithResultLs: Seq[QueryRequestWithResult]): JsValue = {
- toSimpleVertexArrJson(queryRequestWithResultLs, Seq.empty[QueryRequestWithResult])
- }
-
- private def orderBy(queryOption: QueryOption,
- rawEdges: ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)]): ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)] = {
- import OrderingUtil._
-
- if (queryOption.withScore && queryOption.orderByColumns.nonEmpty) {
- val ascendingLs = queryOption.orderByColumns.map(_._2)
- rawEdges.sortBy(_._3)(TupleMultiOrdering[Any](ascendingLs))
+ def s2EdgeToJsValue(s2Edge: Edge,
+ score: Double,
+ isDegree: Boolean = false,
+ parents: JsValue = JsNull): JsValue = {
+ if (isDegree) {
+ Json.obj(
+ "from" -> anyValToJsValue(s2Edge.srcId),
+ "label" -> s2Edge.labelName,
+ LabelMeta.degree.name -> anyValToJsValue(s2Edge.propsWithTs(LabelMeta.degreeSeq).innerVal.value)
+ )
} else {
- rawEdges
+ Json.obj("from" -> anyValToJsValue(s2Edge.srcId),
+ "to" -> anyValToJsValue(s2Edge.tgtId),
+ "label" -> s2Edge.labelName,
+ "score" -> score,
+ "props" -> JSONParser.propertiesToJson(s2Edge.properties),
+ "direction" -> s2Edge.direction,
+ "timestamp" -> anyValToJsValue(s2Edge.ts),
+ "parents" -> parents
+ )
}
}
- private def getColumnValue(keyWithJs: Map[String, JsValue], score: Double, edge: Edge, column: String): Any = {
- column match {
- case "score" => score
- case "timestamp" | "_timestamp" => edge.ts
- case _ =>
- keyWithJs.get(column) match {
- case None => keyWithJs.get("props").map { js => (js \ column).as[JsValue] }.get
- case Some(x) => x
- }
+ def withImpressionId(queryOption: QueryOption,
+ size: Int,
+ degrees: Seq[JsValue],
+ results: Seq[JsValue]): JsValue = {
+ queryOption.impIdOpt match {
+ case None => Json.obj(
+ "size" -> size,
+ "degrees" -> degrees,
+ "results" -> results
+ )
+ case Some(impId) =>
+ Json.obj(
+ "size" -> size,
+ "degrees" -> degrees,
+ "results" -> results,
+ Experiment.ImpressionKey -> impId
+ )
}
}
+ def s2VertexToJson(s2Vertex: Vertex): Option[JsValue] = {
+ val props = for {
+ (k, v) <- s2Vertex.properties
+ jsVal <- anyValToJsValue(v)
+ } yield k -> jsVal
- private def buildReplaceJson(jsValue: JsValue)(mapper: JsValue => JsValue): JsValue = {
- def traverse(js: JsValue): JsValue = js match {
- case JsNull => mapper(JsNull)
- case JsNumber(v) => mapper(js)
- case JsString(v) => mapper(js)
- case JsBoolean(v) => mapper(js)
- case JsArray(elements) => JsArray(elements.map { t => traverse(mapper(t)) })
- case JsObject(values) => JsObject(values.map { case (k, v) => k -> traverse(mapper(v)) })
+ for {
+ id <- anyValToJsValue(s2Vertex.innerIdVal)
+ } yield {
+ Json.obj(
+ "serviceName" -> s2Vertex.serviceName,
+ "columnName" -> s2Vertex.columnName,
+ "id" -> id,
+ "props" -> Json.toJson(props),
+ "timestamp" -> s2Vertex.ts
+ )
}
-
- traverse(jsValue)
}
- /** test query with filterOut is not working since it can not diffrentate filterOut */
- private def buildNextQuery(jsonQuery: JsValue, _cursors: Seq[Seq[String]]): JsValue = {
- val cursors = _cursors.flatten.iterator
+ def verticesToJson(s2Vertices: Seq[Vertex]): JsValue =
+ Json.toJson(s2Vertices.flatMap(s2VertexToJson(_)))
- buildReplaceJson(jsonQuery) {
- case js@JsObject(fields) =>
- val isStep = fields.find { case (k, _) => k == "label" } // find label group
- if (isStep.isEmpty) js
- else {
- // TODO: Order not ensured
- val withCursor = js.fieldSet | Set("cursor" -> JsString(cursors.next))
- JsObject(withCursor.toSeq)
- }
- case js => js
- }
- }
+ def withOptionalFields(queryOption: QueryOption,
+ size: Int,
+ degrees: Seq[JsValue],
+ results: Seq[JsValue],
+ failCount: Int = 0,
+ cursors: => JsValue,
+ nextQuery: => Option[JsValue]): JsValue = {
- private def buildRawEdges(queryOption: QueryOption,
- queryRequestWithResultLs: Seq[QueryRequestWithResult],
- excludeIds: Map[Int, Boolean],
- scoreWeight: Double = 1.0): (ListBuffer[JsValue], ListBuffer[RAW_EDGE]) = {
- val degrees = ListBuffer[JsValue]()
- val rawEdges = ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)]()
- val metaPropNamesMap = scala.collection.mutable.Map[String, Int]()
- for {
- queryRequestWithResult <- queryRequestWithResultLs
- } yield {
- val (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get
- queryRequest.queryParam.label.metaPropNames.foreach { metaPropName =>
- metaPropNamesMap.put(metaPropName, metaPropNamesMap.getOrElse(metaPropName, 0) + 1)
- }
- }
- val propsExistInAll = metaPropNamesMap.filter(kv => kv._2 == queryRequestWithResultLs.length)
- val orderByColumns = queryOption.orderByColumns.filter { case (column, _) =>
- column match {
- case "from" | "to" | "label" | "score" | "timestamp" | "_timestamp" => true
- case _ =>
- propsExistInAll.contains(column)
-// //TODO??
-// false
-// queryParam.label.metaPropNames.contains(column)
- }
- }
+ val kvs = new ArrayBuffer[(String, JsValue)]()
- /* build result jsons */
- for {
- queryRequestWithResult <- queryRequestWithResultLs
- (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
- queryParam = queryRequest.queryParam
- edgeWithScore <- queryResult.edgeWithScoreLs
- (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
- hashKey = toHashKey(edge, queryRequest.queryParam, queryOption.filterOutFields)
- if !excludeIds.contains(hashKey)
- } {
+ kvs.append("size" -> JsNumber(size))
+ kvs.append("degrees" -> JsArray(degrees))
+ kvs.append("results" -> JsArray(results))
- // edge to json
- val (srcColumn, _) = queryParam.label.srcTgtColumn(edge.labelWithDir.dir)
- val fromOpt = innerValToJsValue(edge.srcVertex.id.innerId, srcColumn.columnType)
- if (edge.isDegree && fromOpt.isDefined) {
- if (queryOption.limitOpt.isEmpty) {
- degrees += Json.obj(
- "from" -> fromOpt.get,
- "label" -> queryRequest.queryParam.label.label,
- "direction" -> GraphUtil.fromDirection(edge.labelWithDir.dir),
- LabelMeta.degree.name -> innerValToJsValue(edge.propsWithTs(LabelMeta.degreeSeq).innerVal, InnerVal.LONG)
- )
- }
- } else {
- val keyWithJs = edgeToJson(edge, score, queryRequest.query, queryRequest.queryParam)
- val orderByValues: (Any, Any, Any, Any) = orderByColumns.length match {
- case 0 =>
- (None, None, None, None)
- case 1 =>
- val it = orderByColumns.iterator
- val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
- (v1, None, None, None)
- case 2 =>
- val it = orderByColumns.iterator
- val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
- val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1)
- (v1, v2, None, None)
- case 3 =>
- val it = orderByColumns.iterator
- val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
- val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1)
- val v3 = getColumnValue(keyWithJs, score, edge, it.next()._1)
- (v1, v2, v3, None)
- case _ =>
- val it = orderByColumns.iterator
- val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
- val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1)
- val v3 = getColumnValue(keyWithJs, score, edge, it.next()._1)
- val v4 = getColumnValue(keyWithJs, score, edge, it.next()._1)
- (v1, v2, v3, v4)
- }
+ if (queryOption.impIdOpt.isDefined) kvs.append(Experiment.ImpressionKey -> JsString(queryOption.impIdOpt.get))
- val currentEdge = (keyWithJs, score * scoreWeight, orderByValues)
- rawEdges += currentEdge
- }
- }
- (degrees, rawEdges)
+ JsObject(kvs)
}
- private def buildResultJsValue(queryOption: QueryOption,
- degrees: ListBuffer[JsValue],
- rawEdges: ListBuffer[RAW_EDGE]): JsValue = {
- if (queryOption.groupByColumns.isEmpty) {
- // ordering
- val filteredEdges = rawEdges.filter(t => t._2 >= queryOption.scoreThreshold)
+ def toJson(graph: Graph,
+ queryOption: QueryOption,
+ stepResult: StepResult): JsValue = {
- val edges = queryOption.limitOpt match {
- case None => orderBy(queryOption, filteredEdges).map(_._1)
- case Some(limit) => orderBy(queryOption, filteredEdges).map(_._1).take(limit)
- }
- val resultDegrees = if (queryOption.returnDegree) degrees else emptyDegrees
- Json.obj(
- "size" -> edges.size,
- "degrees" -> resultDegrees,
-// "queryNext" -> buildNextQuery(q.jsonQuery, q.cursorStrings()),
- "results" -> edges
- )
- } else {
- val grouped = rawEdges.groupBy { case (keyWithJs, _, _) =>
- val props = keyWithJs.get("props")
+ val degrees =
+ if (queryOption.returnDegree) stepResult.degreeEdges.map(t => s2EdgeToJsValue(t.s2Edge, t.score, true))
+ else emptyDegrees
- for {
- column <- queryOption.groupByColumns
- value <- keyWithJs.get(column) match {
- case None => props.flatMap { js => (js \ column).asOpt[JsValue] }
- case Some(x) => Some(x)
- }
- } yield column -> value
+ if (queryOption.groupBy.keys.isEmpty) {
+ // no group by specified on query.
+
+ val ls = stepResult.results.map { t =>
+ val parents = if (queryOption.returnTree) s2EdgeParent(graph, t.parentEdges) else JsNull
+ s2EdgeToJsValue(t.s2Edge, t.score, false, parents)
}
+ withImpressionId(queryOption, ls.size, degrees, ls)
+ } else {
- val groupedEdgesWithScoreSum =
+ val results =
for {
- (groupByKeyVals, groupedRawEdges) <- grouped
- scoreSum = groupedRawEdges.map(x => x._2).sum if scoreSum >= queryOption.scoreThreshold
+ (groupByValues, (scoreSum, edges)) <- stepResult.grouped
} yield {
- // ordering
- val edges = orderBy(queryOption, groupedRawEdges).map(_._1)
+ val groupByKeyValues = queryOption.groupBy.keys.zip(groupByValues).map { case (k, valueOpt) =>
+ k -> valueOpt.flatMap(anyValToJsValue).getOrElse(JsNull)
+ }
+ val groupByValuesJson = Json.toJson(groupByKeyValues.toMap)
- //TODO: refactor this
- val js = if (queryOption.returnAgg)
+ if (!queryOption.returnAgg) {
Json.obj(
- "groupBy" -> Json.toJson(groupByKeyVals.toMap),
+ "groupBy" -> groupByValuesJson,
"scoreSum" -> scoreSum,
- "agg" -> edges
+ "agg" -> Json.arr()
)
- else
+ } else {
+ val agg = edges.map { t =>
+ val parents = if (queryOption.returnTree) s2EdgeParent(graph, t.parentEdges) else JsNull
+ s2EdgeToJsValue(t.s2Edge, t.score, false, parents)
+ }
+ val aggJson = Json.toJson(agg)
Json.obj(
- "groupBy" -> Json.toJson(groupByKeyVals.toMap),
+ "groupBy" -> groupByValuesJson,
"scoreSum" -> scoreSum,
- "agg" -> Json.arr()
+ "agg" -> aggJson
)
- (js, scoreSum)
- }
-
- val groupedSortedJsons = queryOption.limitOpt match {
- case None =>
- groupedEdgesWithScoreSum.toList.sortBy { case (jsVal, scoreSum) => scoreSum * -1 }.map(_._1)
- case Some(limit) =>
- groupedEdgesWithScoreSum.toList.sortBy { case (jsVal, scoreSum) => scoreSum * -1 }.map(_._1).take(limit)
- }
- val resultDegrees = if (queryOption.returnDegree) degrees else emptyDegrees
- Json.obj(
- "size" -> groupedSortedJsons.size,
- "degrees" -> resultDegrees,
-// "queryNext" -> buildNextQuery(q.jsonQuery, q.cursorStrings()),
- "results" -> groupedSortedJsons
- )
- }
- }
-
- def toSimpleVertexArrJsonMulti(queryOption: QueryOption,
- resultWithExcludeLs: Seq[(Seq[QueryRequestWithResult], Seq[QueryRequestWithResult])],
- excludes: Seq[QueryRequestWithResult]): JsValue = {
- val excludeIds = (Seq((Seq.empty, excludes)) ++ resultWithExcludeLs).foldLeft(Map.empty[Int, Boolean]) { case (acc, (result, excludes)) =>
- acc ++ resultInnerIds(excludes).map(hashKey => hashKey -> true).toMap
- }
-
- val (degrees, rawEdges) = (ListBuffer.empty[JsValue], ListBuffer.empty[RAW_EDGE])
- for {
- (result, localExclude) <- resultWithExcludeLs
- } {
- val newResult = result.map { queryRequestWithResult =>
- val (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get
- val newQuery = queryRequest.query.copy(queryOption = queryOption)
- queryRequestWithResult.copy(queryRequest = queryRequest.copy(query = newQuery))
- }
- val (_degrees, _rawEdges) = buildRawEdges(queryOption, newResult, excludeIds)
- degrees ++= _degrees
- rawEdges ++= _rawEdges
- }
- buildResultJsValue(queryOption, degrees, rawEdges)
- }
-
- def toSimpleVertexArrJson(queryOption: QueryOption,
- queryRequestWithResultLs: Seq[QueryRequestWithResult],
- exclude: Seq[QueryRequestWithResult]): JsValue = {
- val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap
- val (degrees, rawEdges) = buildRawEdges(queryOption, queryRequestWithResultLs, excludeIds)
- buildResultJsValue(queryOption, degrees, rawEdges)
- }
-
-
- def toSimpleVertexArrJson(queryRequestWithResultLs: Seq[QueryRequestWithResult],
- exclude: Seq[QueryRequestWithResult]): JsValue = {
-
- queryRequestWithResultLs.headOption.map { queryRequestWithResult =>
- val (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get
- val query = queryRequest.query
- val queryOption = query.queryOption
- val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap
- val (degrees, rawEdges) = buildRawEdges(queryOption, queryRequestWithResultLs, excludeIds)
- buildResultJsValue(queryOption, degrees, rawEdges)
- } getOrElse emptyResults
- }
-
- def verticesToJson(vertices: Iterable[Vertex]) = {
- Json.toJson(vertices.flatMap { v => vertexToJson(v) })
- }
-
- def propsToJson(edge: Edge, q: Query, queryParam: QueryParam): Map[String, JsValue] = {
- for {
- (seq, innerValWithTs) <- edge.propsWithTs if LabelMeta.isValidSeq(seq)
- labelMeta <- queryParam.label.metaPropsMap.get(seq)
- jsValue <- innerValToJsValue(innerValWithTs.innerVal, labelMeta.dataType)
- } yield labelMeta.name -> jsValue
- }
-
- private def edgeParent(parentEdges: Seq[EdgeWithScore], q: Query, queryParam: QueryParam): JsValue = {
- if (parentEdges.isEmpty) {
- JsNull
- } else {
- val parents = for {
- parent <- parentEdges
- (parentEdge, parentScore) = EdgeWithScore.unapply(parent).get
- parentQueryParam = QueryParam(parentEdge.labelWithDir)
- parents = edgeParent(parentEdge.parentEdges, q, parentQueryParam) if parents != JsNull
- } yield {
- val originalEdge = parentEdge.originalEdgeOpt.getOrElse(parentEdge)
- val edgeJson = edgeToJsonInner(originalEdge, parentScore, q, parentQueryParam) + ("parents" -> parents)
- Json.toJson(edgeJson)
- }
-
- Json.toJson(parents)
- }
- }
-
- /** TODO */
- def edgeToJsonInner(edge: Edge, score: Double, q: Query, queryParam: QueryParam): Map[String, JsValue] = {
- val (srcColumn, tgtColumn) = queryParam.label.srcTgtColumn(edge.labelWithDir.dir)
-
- val kvMapOpt = for {
- from <- innerValToJsValue(edge.srcVertex.id.innerId, srcColumn.columnType)
- to <- innerValToJsValue(edge.tgtVertex.id.innerId, tgtColumn.columnType)
- } yield {
- val targetColumns = if (q.selectColumnsSet.isEmpty) reservedColumns else (reservedColumns & q.selectColumnsSet) + "props"
- val _propsMap = queryParam.label.metaPropsDefaultMapInner ++ propsToJson(edge, q, queryParam)
- val propsMap = if (q.selectColumnsSet.nonEmpty) _propsMap.filterKeys(q.selectColumnsSet) else _propsMap
-
- val kvMap = targetColumns.foldLeft(Map.empty[String, JsValue]) { (map, column) =>
- val jsValue = column match {
- case "cacheRemain" => JsNumber(queryParam.cacheTTLInMillis - (System.currentTimeMillis() - queryParam.timestamp))
- case "from" => from
- case "to" => to
- case "label" => JsString(queryParam.label.label)
- case "direction" => JsString(GraphUtil.fromDirection(edge.labelWithDir.dir))
- case "_timestamp" | "timestamp" => JsNumber(edge.ts)
- case "score" => JsNumber(score)
- case "props" if propsMap.nonEmpty => Json.toJson(propsMap)
- case _ => JsNull
+ }
}
-
- if (jsValue == JsNull) map else map + (column -> jsValue)
- }
- kvMap
- }
-
- kvMapOpt.getOrElse(Map.empty)
- }
-
- def edgeToJson(edge: Edge, score: Double, q: Query, queryParam: QueryParam): Map[String, JsValue] = {
- val kvs = edgeToJsonInner(edge, score, q, queryParam)
- if (kvs.nonEmpty && q.returnTree) kvs + ("parents" -> Json.toJson(edgeParent(edge.parentEdges, q, queryParam)))
- else kvs
- }
-
- def vertexToJson(vertex: Vertex): Option[JsObject] = {
- val serviceColumn = ServiceColumn.findById(vertex.id.colId)
-
- for {
- id <- innerValToJsValue(vertex.innerId, serviceColumn.columnType)
- } yield {
- Json.obj("serviceName" -> serviceColumn.service.serviceName,
- "columnName" -> serviceColumn.columnName,
- "id" -> id, "props" -> propsToJson(vertex),
- "timestamp" -> vertex.ts,
- // "belongsTo" -> vertex.belongLabelIds)
- "belongsTo" -> vertex.belongLabelIds.flatMap(Label.findByIdOpt(_).map(_.label)))
+ withImpressionId(queryOption, results.size, degrees, results)
}
}
-
- private def keysToName(seqsToNames: Map[Int, String], props: Map[Int, InnerValLike]) = {
- for {
- (seq, value) <- props
- name <- seqsToNames.get(seq)
- } yield (name, value)
- }
-
- private def propsToJson(vertex: Vertex) = {
- val serviceColumn = vertex.serviceColumn
- val props = for {
- (propKey, innerVal) <- vertex.props
- columnMeta <- ColumnMeta.findByIdAndSeq(serviceColumn.id.get, propKey.toByte, useCache = true)
- jsValue <- innerValToJsValue(innerVal, columnMeta.dataType)
- } yield {
- (columnMeta.name -> jsValue)
- }
- props.toMap
- }
-
- def propsToJson(edge: Edge) = {
- for {
- (seq, v) <- edge.propsWithTs if LabelMeta.isValidSeq(seq)
- metaProp <- edge.label.metaPropsMap.get(seq)
- jsValue <- innerValToJsValue(v.innerVal, metaProp.dataType)
- } yield {
- (metaProp.name, jsValue)
- }
- }
-
- def summarizeWithListExclude(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]): JsObject = {
- val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap
-
-
- val groupedEdgesWithRank = (for {
- queryRequestWithResult <- queryRequestWithResultLs
- (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
- edgeWithScore <- queryResult.edgeWithScoreLs
- (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
- if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, queryRequest.query.filterOutFields))
- } yield {
- (edge, score)
- }).groupBy { case (edge, score) =>
- (edge.label.tgtColumn, edge.label.srcColumn, edge.tgtVertex.innerId)
- }
-
- val jsons = for {
- ((tgtColumn, srcColumn, target), edgesAndRanks) <- groupedEdgesWithRank
- (edges, ranks) = edgesAndRanks.groupBy(x => x._1.srcVertex).map(_._2.head).unzip
- tgtId <- innerValToJsValue(target, tgtColumn.columnType)
- } yield {
- Json.obj(tgtColumn.columnName -> tgtId,
- s"${srcColumn.columnName}s" ->
- edges.flatMap(edge => innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType)), "scoreSum" -> ranks.sum)
- }
- val sortedJsons = jsons.toList.sortBy { jsObj => (jsObj \ "scoreSum").as[Double] }.reverse
- if (queryRequestWithResultLs.isEmpty) {
- Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons)
- } else {
- Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons,
- "impressionId" -> queryRequestWithResultLs.head.queryRequest.query.impressionId())
- }
-
- }
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
index 28d78a9..ef40688 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -21,6 +21,7 @@ package org.apache.s2graph.core
import com.google.common.hash.Hashing
import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
import org.apache.s2graph.core.parsers.{Where, WhereParser}
import org.apache.s2graph.core.types.{HBaseSerializable, InnerVal, InnerValLike, LabelWithDirection}
@@ -51,6 +52,12 @@ object Query {
}
}
+object GroupBy {
+ val Empty = GroupBy()
+}
+case class GroupBy(keys: Seq[String] = Nil,
+ limit: Int = Int.MaxValue)
+
case class MultiQuery(queries: Seq[Query],
weights: Seq[Double],
queryOption: QueryOption,
@@ -58,7 +65,7 @@ case class MultiQuery(queries: Seq[Query],
case class QueryOption(removeCycle: Boolean = false,
selectColumns: Seq[String] = Seq.empty,
- groupByColumns: Seq[String] = Seq.empty,
+ groupBy: GroupBy = GroupBy.Empty,
orderByColumns: Seq[(String, Boolean)] = Seq.empty,
filterOutQuery: Option[Query] = None,
filterOutFields: Seq[String] = Seq(LabelMeta.to.name),
@@ -67,8 +74,11 @@ case class QueryOption(removeCycle: Boolean = false,
limitOpt: Option[Int] = None,
returnAgg: Boolean = true,
scoreThreshold: Double = Double.MinValue,
- returnDegree: Boolean = true)
-
+ returnDegree: Boolean = true,
+ impIdOpt: Option[String] = None) {
+ val orderByKeys = orderByColumns.map(_._1)
+ val ascendingVals = orderByColumns.map(_._2)
+}
case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex],
steps: IndexedSeq[Step] = Vector.empty[Step],
@@ -77,8 +87,7 @@ case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex],
val removeCycle = queryOption.removeCycle
val selectColumns = queryOption.selectColumns
-// val groupBy = queryOption.groupBy
- val groupByColumns = queryOption.groupByColumns
+ val groupBy = queryOption.groupBy
val orderByColumns = queryOption.orderByColumns
val filterOutQuery = queryOption.filterOutQuery
val filterOutFields = queryOption.filterOutFields
@@ -90,7 +99,7 @@ case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex],
def cacheKeyBytes: Array[Byte] = {
val selectBytes = Bytes.toBytes(queryOption.selectColumns.toString)
- val groupBytes = Bytes.toBytes(queryOption.groupByColumns.toString)
+ val groupBytes = Bytes.toBytes(queryOption.groupBy.keys.toString)
val orderByBytes = Bytes.toBytes(queryOption.orderByColumns.toString)
val filterOutBytes = queryOption.filterOutQuery.map(_.cacheKeyBytes).getOrElse(Array.empty[Byte])
val returnTreeBytes = Bytes.toBytes(queryOption.returnTree)
@@ -279,11 +288,23 @@ case class RankParam(labelId: Int, var keySeqAndWeights: Seq[(Byte, Double)] = S
bytes
}
}
+case class S2Request(labelName: String,
+ direction: String = "out",
+ ts: Long = System.currentTimeMillis(),
+ options: Map[String, Any] = Map.empty) {
+ val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
+ val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
+ val labelWithDir = LabelWithDirection(label.id.get, dir)
+ //TODO: need to merge options into queryParam.
+ val queryParam = QueryParam(labelWithDir, ts)
+}
object QueryParam {
lazy val Empty = QueryParam(LabelWithDirection(0, 0))
lazy val DefaultThreshold = Double.MinValue
val Delimiter = ","
+ val maxMetaByte = (-1).toByte
+ val fillArray = Array.fill(100)(maxMetaByte)
}
case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System.currentTimeMillis()) {
@@ -381,13 +402,13 @@ case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System
}
def limit(offset: Int, limit: Int): QueryParam = {
- /* since degree info is located on first always */
- if (offset == 0 && this.columnRangeFilter == null) {
- this.limit = limit + 1
- this.offset = offset
- } else {
- this.limit = limit
- this.offset = offset + 1
+ /** since degree info is located on first always */
+ this.limit = limit
+ this.offset = offset
+
+ if (this.columnRangeFilter == null) {
+ if (offset == 0) this.limit = limit + 1
+ else this.offset = offset + 1
}
// this.columnPaginationFilter = new ColumnPaginationFilter(this.limit, this.offset)
this
@@ -400,26 +421,24 @@ case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System
}
}
- def interval(from: Seq[(Byte, InnerValLike)], to: Seq[(Byte, InnerValLike)]): QueryParam = {
- // val len = label.orderTypes.size.toByte
- // val len = label.extraIndicesMap(labelOrderSeq).sortKeyTypes.size.toByte
- // logger.error(s"indicesMap: ${label.indicesMap(labelOrderSeq)}")
- val len = label.indicesMap(labelOrderSeq).sortKeyTypes.size.toByte
+ def paddingInterval(len: Byte, from: Seq[(Byte, InnerValLike)], to: Seq[(Byte, InnerValLike)]) = {
+ val fromVal = Bytes.add(propsToBytes(from), QueryParam.fillArray)
+ val toVal = propsToBytes(to)
- val minMetaByte = InnerVal.minMetaByte
- // val maxMetaByte = InnerVal.maxMetaByte
- val maxMetaByte = -1.toByte
- val toVal = Bytes.add(propsToBytes(to), Array.fill(1)(minMetaByte))
- //FIXME
- val fromVal = Bytes.add(propsToBytes(from), Array.fill(10)(maxMetaByte))
toVal(0) = len
fromVal(0) = len
- val maxBytes = fromVal
- val minBytes = toVal
+
+ val minMax = (toVal, fromVal) // inverted
+ minMax
+ }
+
+ def interval(from: Seq[(Byte, InnerValLike)], to: Seq[(Byte, InnerValLike)]): QueryParam = {
+ val len = label.indicesMap(labelOrderSeq).sortKeyTypes.size.toByte
+ val (minBytes, maxBytes) = paddingInterval(len, from, to)
+
this.columnRangeFilterMaxBytes = maxBytes
this.columnRangeFilterMinBytes = minBytes
- val rangeFilter = new ColumnRangeFilter(minBytes, true, maxBytes, true)
- this.columnRangeFilter = rangeFilter
+ this.columnRangeFilter = new ColumnRangeFilter(minBytes, true, maxBytes, true)
this
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
index 5343659..5b2622f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
@@ -21,31 +21,42 @@ package org.apache.s2graph.core
import org.apache.s2graph.core.mysqls.LabelMeta
import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs}
+import org.apache.s2graph.core.utils.logger
-import scala.collection.Seq
+import scala.collection.mutable.ListBuffer
+import scala.collection.{Seq, mutable}
object QueryResult {
- def fromVertices(query: Query): Seq[QueryRequestWithResult] = {
+ def fromVertices(query: Query): StepInnerResult = {
if (query.steps.isEmpty || query.steps.head.queryParams.isEmpty) {
- Seq.empty
+ StepInnerResult.Empty
} else {
val queryParam = query.steps.head.queryParams.head
val label = queryParam.label
val currentTs = System.currentTimeMillis()
val propsWithTs = Map(LabelMeta.timeStampSeq ->
InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), currentTs))
- for {
+ val edgeWithScoreLs = for {
vertex <- query.vertices
} yield {
- val edge = Edge(vertex, vertex, queryParam.labelWithDir, propsWithTs = propsWithTs)
- val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore)
- QueryRequestWithResult(QueryRequest(query, -1, vertex, queryParam),
- QueryResult(edgeWithScoreLs = Seq(edgeWithScore)))
- }
+ val edge = Edge(vertex, vertex, queryParam.labelWithDir, propsWithTs = propsWithTs)
+ val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore)
+ edgeWithScore
+ }
+ StepInnerResult(edgesWithScoreLs = edgeWithScoreLs, Nil, false)
}
}
}
-case class QueryRequestWithResult(queryRequest: QueryRequest, queryResult: QueryResult)
+/** inner traverse */
+object StepInnerResult {
+ val Failure = StepInnerResult(Nil, Nil, true)
+ val Empty = StepInnerResult(Nil, Nil, false)
+}
+case class StepInnerResult(edgesWithScoreLs: Seq[EdgeWithScore],
+ degreeEdges: Seq[EdgeWithScore],
+ isFailure: Boolean = false) {
+ val isEmpty = edgesWithScoreLs.isEmpty
+}
case class QueryRequest(query: Query,
stepIdx: Int,
@@ -59,3 +70,206 @@ case class QueryResult(edgeWithScoreLs: Seq[EdgeWithScore] = Nil,
isFailure: Boolean = false)
case class EdgeWithScore(edge: Edge, score: Double)
+
+
+
+
+
+/** result */
+
+object StepResult {
+
+ type Values = Seq[S2EdgeWithScore]
+ type GroupByKey = Seq[Option[Any]]
+ val EmptyOrderByValues = (None, None, None, None)
+ val Empty = StepResult(Nil, Nil, Nil)
+
+
+ def mergeOrdered(left: StepResult.Values,
+ right: StepResult.Values,
+ queryOption: QueryOption): (Double, StepResult.Values) = {
+ val merged = (left ++ right)
+ val scoreSum = merged.foldLeft(0.0) { case (prev, current) => prev + current.score }
+ if (scoreSum < queryOption.scoreThreshold) (0.0, Nil)
+ else {
+ val ordered = orderBy(queryOption, merged)
+ val filtered = ordered.take(queryOption.groupBy.limit)
+ val newScoreSum = filtered.foldLeft(0.0) { case (prev, current) => prev + current.score }
+ (newScoreSum, filtered)
+ }
+ }
+
+ def orderBy(queryOption: QueryOption, notOrdered: Values): Values = {
+ import OrderingUtil._
+
+ if (queryOption.withScore && queryOption.orderByColumns.nonEmpty) {
+ notOrdered.sortBy(_.orderByValues)(TupleMultiOrdering[Any](queryOption.ascendingVals))
+ } else {
+ notOrdered
+ }
+ }
+ def toOrderByValues(s2Edge: Edge,
+ score: Double,
+ orderByKeys: Seq[String]): (Any, Any, Any, Any) = {
+ def toValue(propertyKey: String): Any = {
+ propertyKey match {
+ case "score" => score
+ case "timestamp" | "_timestamp" => s2Edge.ts
+ case _ => s2Edge.properties.get(propertyKey)
+ }
+ }
+ if (orderByKeys.isEmpty) (None, None, None, None)
+ else {
+ orderByKeys.length match {
+ case 1 =>
+ (toValue(orderByKeys(0)), None, None, None)
+ case 2 =>
+ (toValue(orderByKeys(0)), toValue(orderByKeys(1)), None, None)
+ case 3 =>
+ (toValue(orderByKeys(0)), toValue(orderByKeys(1)), toValue(orderByKeys(2)), None)
+ case _ =>
+ (toValue(orderByKeys(0)), toValue(orderByKeys(1)), toValue(orderByKeys(2)), toValue(orderByKeys(3)))
+ }
+ }
+ }
+ /**
+ * merge multiple StepResult into one StepResult.
+ * @param queryOption
+ * @param multiStepResults
+ * @return
+ */
+ def merges(queryOption: QueryOption,
+ multiStepResults: Seq[StepResult],
+ weights: Seq[Double] = Nil): StepResult = {
+ val degrees = multiStepResults.flatMap(_.degreeEdges)
+ val ls = new mutable.ListBuffer[S2EdgeWithScore]()
+ val agg= new mutable.HashMap[GroupByKey, ListBuffer[S2EdgeWithScore]]()
+ val sums = new mutable.HashMap[GroupByKey, Double]()
+
+ for {
+ (weight, eachStepResult) <- weights.zip(multiStepResults)
+ (ordered, grouped) = (eachStepResult.results, eachStepResult.grouped)
+ } {
+ ordered.foreach { t =>
+ val newScore = t.score * weight
+ ls += t.copy(score = newScore)
+ }
+
+ // process each query's stepResult's grouped
+ for {
+ (groupByKey, (scoreSum, values)) <- grouped
+ } {
+ val buffer = agg.getOrElseUpdate(groupByKey, ListBuffer.empty[S2EdgeWithScore])
+ var scoreSum = 0.0
+ values.foreach { t =>
+ val newScore = t.score * weight
+ buffer += t.copy(score = newScore)
+ scoreSum += newScore
+ }
+ sums += (groupByKey -> scoreSum)
+ }
+ }
+
+ // process global groupBy
+ if (queryOption.groupBy.keys.nonEmpty) {
+ for {
+ s2EdgeWithScore <- ls
+ groupByKey = s2EdgeWithScore.s2Edge.selectValues(queryOption.groupBy.keys)
+ } {
+ val buffer = agg.getOrElseUpdate(groupByKey, ListBuffer.empty[S2EdgeWithScore])
+ buffer += s2EdgeWithScore
+ val newScore = sums.getOrElse(groupByKey, 0.0) + s2EdgeWithScore.score
+ sums += (groupByKey -> newScore)
+ }
+ }
+
+
+ val ordered = orderBy(queryOption, ls)
+ val grouped = for {
+ (groupByKey, scoreSum) <- sums.toSeq.sortBy(_._2 * -1)
+ aggregated = agg(groupByKey) if aggregated.nonEmpty
+ } yield groupByKey -> (scoreSum, aggregated)
+
+ StepResult(results = ordered, grouped = grouped, degrees)
+ }
+
+ //TODO: Optimize this.
+ def filterOut(graph: Graph,
+ queryOption: QueryOption,
+ baseStepResult: StepResult,
+ filterOutStepInnerResult: StepInnerResult): StepResult = {
+
+ val fields = if (queryOption.filterOutFields.isEmpty) Seq("to") else Seq("to")
+ // else queryOption.filterOutFields
+ val filterOutSet = filterOutStepInnerResult.edgesWithScoreLs.map { t =>
+ t.edge.selectValues(fields)
+ }.toSet
+
+ val filteredResults = baseStepResult.results.filter { t =>
+ val filterOutKey = t.s2Edge.selectValues(fields)
+ !filterOutSet.contains(filterOutKey)
+ }
+
+ val grouped = for {
+ (key, (scoreSum, values)) <- baseStepResult.grouped
+ (out, in) = values.partition(v => filterOutSet.contains(v.s2Edge.selectValues(fields)))
+ newScoreSum = scoreSum - out.foldLeft(0.0) { case (prev, current) => prev + current.score } if in.nonEmpty
+ } yield key -> (newScoreSum, in)
+
+
+ StepResult(results = filteredResults, grouped = grouped, baseStepResult.degreeEdges)
+ }
+ def apply(graph: Graph,
+ queryOption: QueryOption,
+ stepInnerResult: StepInnerResult): StepResult = {
+ logger.debug(s"[BeforePostProcess]: ${stepInnerResult.edgesWithScoreLs.size}")
+
+ val results = for {
+ edgeWithScore <- stepInnerResult.edgesWithScoreLs
+ } yield {
+ val edge = edgeWithScore.edge
+ val orderByValues =
+ if (queryOption.orderByColumns.isEmpty) (edgeWithScore.score, None, None, None)
+ else toOrderByValues(edge, edgeWithScore.score, queryOption.orderByKeys)
+
+ S2EdgeWithScore(edge, edgeWithScore.score, orderByValues, edgeWithScore.edge.parentEdges)
+ }
+ /** ordered flatten result */
+ val ordered = orderBy(queryOption, results)
+
+ /** ordered grouped result */
+ val grouped =
+ if (queryOption.groupBy.keys.isEmpty) Nil
+ else {
+ val agg = new mutable.HashMap[GroupByKey, (Double, Values)]()
+ results.groupBy { s2EdgeWithScore =>
+ s2EdgeWithScore.s2Edge.selectValues(queryOption.groupBy.keys, useToString = true)
+ }.map { case (k, ls) =>
+ val (scoreSum, merged) = mergeOrdered(ls, Nil, queryOption)
+ /**
+ * watch out here. by calling toString on Any, we lose type information which will be used
+ * later for toJson.
+ */
+ if (merged.nonEmpty) {
+ val newKey = merged.head.s2Edge.selectValues(queryOption.groupBy.keys, useToString = false)
+ agg += (newKey -> (scoreSum, merged))
+ }
+ }
+ agg.toSeq.sortBy(_._2._1 * -1)
+ }
+
+ val degrees = stepInnerResult.degreeEdges.map(t => S2EdgeWithScore(t.edge, t.score))
+ StepResult(results = ordered, grouped = grouped, degreeEdges = degrees)
+ }
+}
+
+case class S2EdgeWithScore(s2Edge: Edge,
+ score: Double,
+ orderByValues: (Any, Any, Any, Any) = StepResult.EmptyOrderByValues,
+ parentEdges: Seq[EdgeWithScore] = Nil)
+
+case class StepResult(results: StepResult.Values,
+ grouped: Seq[(StepResult.GroupByKey, (Double, StepResult.Values))],
+ degreeEdges: StepResult.Values) {
+ val isEmpty = results.isEmpty
+}
\ No newline at end of file