You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/11/16 16:01:23 UTC
[2/3] incubator-s2graph git commit: [S2GRAPH-121]: Create `Result`
class to hold traverse result edges.
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
index ede1127..0377bd8 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
@@ -19,10 +19,53 @@
package org.apache.s2graph.core
+import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core.mysqls.{ColumnMeta, Service, ServiceColumn}
import org.apache.s2graph.core.types.{InnerVal, InnerValLike, SourceVertexId, VertexId}
import play.api.libs.json.Json
-
+//
+//object S2Vertex {
+// def apply(graph: Graph, vertex: Vertex): S2Vertex = {
+// S2Vertex(graph,
+// vertex.serviceName,
+// vertex.serviceColumn.columnName,
+// vertex.innerIdVal,
+// vertex.serviceColumn.innerValsToProps(vertex.props),
+// vertex.ts,
+// GraphUtil.fromOp(vertex.op)
+// )
+// }
+//}
+//
+//case class S2Vertex(graph: Graph,
+// serviceName: String,
+// columnName: String,
+// id: Any,
+// props: Map[String, Any] = Map.empty,
+// ts: Long = System.currentTimeMillis(),
+// operation: String = "insert") extends GraphElement {
+// lazy val vertex = {
+// val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found."))
+// val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found."))
+// val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
+//
+// val srcVertexId = VertexId(column.id.get, toInnerVal(id.toString, column.columnType, column.schemaVersion))
+// val propsInner = column.propsToInnerVals(props) ++
+// Map(ColumnMeta.timeStampSeq.toInt -> InnerVal.withLong(ts, column.schemaVersion))
+//
+// Vertex(srcVertexId, ts, propsInner, op)
+// }
+//
+// val uniqueId = (serviceName, columnName, id)
+//
+// override def isAsync: Boolean = vertex.isAsync
+//
+// override def toLogString(): String = vertex.toLogString()
+//
+// override def queueKey: String = vertex.queueKey
+//
+// override def queuePartitionKey: String = vertex.queuePartitionKey
+//}
case class Vertex(id: VertexId,
ts: Long = System.currentTimeMillis(),
props: Map[Int, InnerValLike] = Map.empty[Int, InnerValLike],
@@ -31,10 +74,19 @@ case class Vertex(id: VertexId,
val innerId = id.innerId
+ val innerIdVal = innerId.value
+
+ lazy val properties = for {
+ (k, v) <- props
+ meta <- serviceColumn.metasMap.get(k)
+ } yield meta.name -> v.value
+
def schemaVer = serviceColumn.schemaVersion
def serviceColumn = ServiceColumn.findById(id.colId)
+ def columnName = serviceColumn.columnName
+
def service = Service.findById(serviceColumn.serviceId)
lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, service.hTableName)
@@ -114,7 +166,21 @@ object Vertex {
def isLabelId(propKey: Int): Boolean = propKey > Byte.MaxValue
- // val emptyVertex = Vertex(new CompositeId(CompositeId.defaultColId, CompositeId.defaultInnerId, false, true),
- // System.currentTimeMillis())
- def fromString(s: String): Option[Vertex] = Graph.toVertex(s)
+ def toVertex(serviceName: String,
+ columnName: String,
+ id: Any,
+ props: Map[String, Any] = Map.empty,
+ ts: Long = System.currentTimeMillis(),
+ operation: String = "insert"): Vertex = {
+
+ val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found."))
+ val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found."))
+ val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
+
+ val srcVertexId = VertexId(column.id.get, toInnerVal(id.toString, column.columnType, column.schemaVersion))
+ val propsInner = column.propsToInnerVals(props) ++
+ Map(ColumnMeta.timeStampSeq.toInt -> InnerVal.withLong(ts, column.schemaVersion))
+
+ new Vertex(srcVertexId, ts, propsInner, op)
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
index 9bd172d..cd825f4 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
@@ -25,7 +25,8 @@ import scalikejdbc._
import scala.util.Random
object Experiment extends Model[Experiment] {
- val impressionKey = "S2-Impression-Id"
+ val ImpressionKey = "S2-Impression-Id"
+ val ImpressionId = "Impression-Id"
def apply(rs: WrappedResultSet): Experiment = {
Experiment(rs.intOpt("id"),
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
index 2734211..f7318f6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
@@ -19,13 +19,15 @@
package org.apache.s2graph.core.mysqls
+import java.util.Calendar
import org.apache.s2graph.core.GraphExceptions.ModelNotFoundException
import org.apache.s2graph.core.GraphUtil
+import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
+import org.apache.s2graph.core.types.{InnerValLike, InnerValLikeWithTs}
import org.apache.s2graph.core.utils.logger
-import org.apache.s2graph.core.JSONParser._
-import play.api.libs.json._
+import play.api.libs.json.{JsObject, JsValue, Json}
import scalikejdbc._
object Label extends Model[Label] {
@@ -48,6 +50,7 @@ object Label extends Model[Label] {
Label.delete(id.get)
}
+
def findByName(labelName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Label] = {
val cacheKey = "label=" + labelName
lazy val labelOpt =
@@ -292,11 +295,16 @@ case class Label(id: Option[Int], label: String,
lazy val srcColumn = ServiceColumn.find(srcServiceId, srcColumnName).getOrElse(throw ModelNotFoundException("Source column not found"))
lazy val tgtColumn = ServiceColumn.find(tgtServiceId, tgtColumnName).getOrElse(throw ModelNotFoundException("Target column not found"))
- lazy val direction = if (isDirected) "out" else "undirected"
lazy val defaultIndex = LabelIndex.findByLabelIdAndSeq(id.get, LabelIndex.DefaultSeq)
//TODO: Make sure this is correct
+
+// lazy val metas = metas(useCache = true)
lazy val indices = LabelIndex.findByLabelIdAll(id.get, useCache = true)
+ lazy val labelMetas = LabelMeta.findAllByLabelId(id.get, useCache = true)
+ lazy val labelMetaSet = labelMetas.toSet
+ lazy val labelMetaMap = (labelMetas ++ LabelMeta.reservedMetas).map(m => m.seq -> m).toMap
+
lazy val indicesMap = indices.map(idx => (idx.seq, idx)) toMap
lazy val indexSeqsMap = indices.map(idx => (idx.metaSeqs, idx)) toMap
lazy val indexNameMap = indices.map(idx => (idx.name, idx)) toMap
@@ -327,14 +335,37 @@ case class Label(id: Option[Int], label: String,
jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType)
} yield prop.name -> jsValue).toMap
+ lazy val metaPropsDefaultMapInnerString = (for {
+ prop <- metaPropsInner if LabelMeta.isValidSeq(prop.seq)
+ innerVal = InnerValLikeWithTs(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), System.currentTimeMillis())
+ } yield prop.name -> innerVal).toMap
+
lazy val metaPropsDefaultMapInner = (for {
prop <- metaPropsInner if LabelMeta.isValidSeq(prop.seq)
+ innerVal = InnerValLikeWithTs(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), System.currentTimeMillis())
+ } yield prop -> innerVal).toMap
+ lazy val metaPropsDefaultMapInnerSeq = metaPropsDefaultMapInner.toSeq
+ lazy val metaPropsJsValueWithDefault = (for {
+ prop <- metaProps if LabelMeta.isValidSeq(prop.seq)
jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType)
- } yield prop.name -> jsValue).toMap
+ } yield prop -> jsValue).toMap
+// lazy val extraOptions = Model.extraOptions(Option("""{
+// "storage": {
+// "s2graph.storage.backend": "rocks",
+// "rocks.db.path": "/tmp/db"
+// }
+// }"""))
lazy val extraOptions: Map[String, JsValue] = options match {
case None => Map.empty
- case Some(v) => Json.parse(v).asOpt[JsObject].map { obj => obj.fields.toMap }.getOrElse(Map.empty)
+ case Some(v) =>
+ try {
+ Json.parse(v).asOpt[JsObject].map { obj => obj.fields.toMap }.getOrElse(Map.empty)
+ } catch {
+ case e: Exception =>
+ logger.error(s"An error occurs while parsing the extra label option: ${label}", e)
+ Map.empty
+ }
}
def srcColumnWithDir(dir: Int) = {
@@ -386,5 +417,23 @@ case class Label(id: Option[Int], label: String,
)
+ def propsToInnerValsWithTs(props: Map[String, Any],
+ ts: Long = System.currentTimeMillis()): Map[Byte, InnerValLikeWithTs] = {
+ for {
+ (k, v) <- props
+ labelMeta <- metaPropsInvMap.get(k)
+ innerVal = toInnerVal(v.toString, labelMeta.dataType, schemaVersion)
+ } yield labelMeta.seq -> InnerValLikeWithTs(innerVal, ts)
+
+ }
+
+ def innerValsWithTsToProps(props: Map[Byte, InnerValLikeWithTs]): Map[String, Any] = {
+ for {
+ (k, v) <- props
+ labelMeta <- metaPropsMap.get(k)
+ } yield {
+ labelMeta.name -> v.innerVal.value
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
index b68fa79..6fceabc 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
@@ -24,6 +24,8 @@ package org.apache.s2graph.core.mysqls
*/
import org.apache.s2graph.core.JSONParser
+import org.apache.s2graph.core.JSONParser._
+import org.apache.s2graph.core.types.{InnerValLikeWithTs, InnerValLike}
import play.api.libs.json.Json
import scalikejdbc._
object ServiceColumn extends Model[ServiceColumn] {
@@ -89,13 +91,40 @@ object ServiceColumn extends Model[ServiceColumn] {
})
}
}
-case class ServiceColumn(id: Option[Int], serviceId: Int, columnName: String, columnType: String, schemaVersion: String) {
+case class ServiceColumn(id: Option[Int], serviceId: Int, columnName: String, columnType: String, schemaVersion: String) {
lazy val service = Service.findById(serviceId)
lazy val metas = ColumnMeta.findAllByColumn(id.get)
+ lazy val metasMap = metas.map { meta => meta.seq.toInt -> meta } toMap
lazy val metasInvMap = metas.map { meta => meta.name -> meta} toMap
lazy val metaNamesMap = (ColumnMeta.lastModifiedAtColumn :: metas).map(x => (x.seq.toInt, x.name)) toMap
lazy val toJson = Json.obj("serviceName" -> service.serviceName, "columnName" -> columnName, "columnType" -> columnType)
+ def propsToInnerVals(props: Map[String, Any]): Map[Int, InnerValLike] = {
+ for {
+ (k, v) <- props
+ labelMeta <- metasInvMap.get(k)
+ innerVal = toInnerVal(v.toString, labelMeta.dataType, schemaVersion)
+ } yield labelMeta.seq.toInt -> innerVal
+ }
+
+ def innerValsToProps(props: Map[Int, InnerValLike]): Map[String, Any] = {
+ for {
+ (k, v) <- props
+ columnMeta <- metasMap.get(k)
+ } yield {
+ columnMeta.name -> v.value
+ }
+ }
+
+ def innerValsWithTsToProps(props: Map[Int, InnerValLikeWithTs]): Map[String, Any] = {
+ for {
+ (k, v) <- props
+ columnMeta <- metasMap.get(k)
+ } yield {
+ columnMeta.name -> v.innerVal.value
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
index 5920f3c..1c93667 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
@@ -22,7 +22,7 @@ package org.apache.s2graph.core.parsers
import org.apache.s2graph.core.GraphExceptions.WhereParserException
import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
import org.apache.s2graph.core.types.InnerValLike
-import org.apache.s2graph.core.{Edge, GraphExceptions, JSONParser}
+import org.apache.s2graph.core.Edge
import org.apache.s2graph.core.JSONParser._
import scala.annotation.tailrec
import scala.util.Try
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index 52ee50d..d77ac7d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -36,9 +36,11 @@ import scala.util.{Failure, Success, Try}
object TemplateHelper {
val findVar = """\"?\$\{(.*?)\}\"?""".r
val num = """(next_day|next_hour|next_week|now)?\s*(-?\s*[0-9]+)?\s*(hour|day|week)?""".r
+
val hour = 60 * 60 * 1000L
val day = hour * 24L
val week = day * 7L
+
def calculate(now: Long, n: Int, unit: String): Long = {
val duration = unit match {
case "hour" | "HOUR" => n * hour
@@ -72,12 +74,32 @@ object TemplateHelper {
}
}
-class RequestParser(config: Config) {
+object RequestParser {
+ type ExperimentParam = (JsObject, String, String, String, Option[String])
+ val defaultLimit = 100
+
+ def toJsValues(jsValue: JsValue): List[JsValue] = {
+ jsValue match {
+ case obj: JsObject => List(obj)
+ case arr: JsArray => arr.as[List[JsValue]]
+ case _ => List.empty[JsValue]
+ }
+ }
+
+ def jsToStr(js: JsValue): String = js match {
+ case JsString(s) => s
+ case _ => js.toString()
+ }
+
+}
+
+class RequestParser(graph: Graph) {
import Management.JsonModel._
+ import RequestParser._
- val hardLimit = 100000
- val defaultLimit = 100
+ val config = graph.config
+ val hardLimit = config.getInt("query.hardlimit")
val maxLimit = Int.MaxValue - 1
val DefaultRpcTimeout = config.getInt("hbase.rpc.timeout")
val DefaultMaxAttempt = config.getInt("hbase.client.retries.number")
@@ -106,13 +128,11 @@ class RequestParser(config: Config) {
(labelOrderType.seq, value)
}
}
+
ret
}
- def extractInterval(label: Label, _jsValue: JsValue) = {
- val replaced = TemplateHelper.replaceVariable(System.currentTimeMillis(), _jsValue.toString())
- val jsValue = Json.parse(replaced)
-
+ def extractInterval(label: Label, jsValue: JsValue) = {
def extractKv(js: JsValue) = js match {
case JsObject(map) => map.toSeq
case JsArray(arr) => arr.flatMap {
@@ -135,15 +155,21 @@ class RequestParser(config: Config) {
ret
}
- def extractDuration(label: Label, _jsValue: JsValue) = {
- val replaced = TemplateHelper.replaceVariable(System.currentTimeMillis(), _jsValue.toString())
- val jsValue = Json.parse(replaced)
-
+ def extractDuration(label: Label, jsValue: JsValue) = {
for {
js <- parseOption[JsObject](jsValue, "duration")
} yield {
- val minTs = parseOption[Long](js, "from").getOrElse(Long.MaxValue)
- val maxTs = parseOption[Long](js, "to").getOrElse(Long.MinValue)
+ val minTs = (js \ "from").get match {
+ case JsString(s) => TemplateHelper.replaceVariable(System.currentTimeMillis(), s).toLong
+ case JsNumber(n) => n.toLong
+ case _ => Long.MinValue
+ }
+
+ val maxTs = (js \ "to").get match {
+ case JsString(s) => TemplateHelper.replaceVariable(System.currentTimeMillis(), s).toLong
+ case JsNumber(n) => n.toLong
+ case _ => Long.MaxValue
+ }
if (minTs > maxTs) {
throw new BadQueryException("Duration error. Timestamp of From cannot be larger than To.")
@@ -167,21 +193,24 @@ class RequestParser(config: Config) {
}
ret.map(_.toMap).getOrElse(Map.empty[Byte, InnerValLike])
}
-
+
def extractWhere(label: Label, whereClauseOpt: Option[String]): Try[Where] = {
whereClauseOpt match {
case None => Success(WhereParser.success)
- case Some(_where) =>
- val where = TemplateHelper.replaceVariable(System.currentTimeMillis(), _where)
+ case Some(where) =>
val whereParserKey = s"${label.label}_${where}"
+
parserCache.get(whereParserKey, new Callable[Try[Where]] {
override def call(): Try[Where] = {
- WhereParser(label).parse(where) match {
+ val _where = TemplateHelper.replaceVariable(System.currentTimeMillis(), where)
+
+ WhereParser(label).parse(_where) match {
case s@Success(_) => s
case Failure(ex) => throw BadQueryException(ex.getMessage, ex)
}
}
})
+
}
}
@@ -194,28 +223,38 @@ class RequestParser(config: Config) {
} yield {
Vertex(SourceVertexId(serviceColumn.id.get, innerId), System.currentTimeMillis())
}
- vertices.toSeq
+
+ vertices
}
- def toMultiQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): MultiQuery = {
+ def toMultiQuery(jsValue: JsValue, impIdOpt: Option[String]): MultiQuery = {
val queries = for {
queryJson <- (jsValue \ "queries").asOpt[Seq[JsValue]].getOrElse(Seq.empty)
} yield {
- toQuery(queryJson, isEdgeQuery)
+ toQuery(queryJson, impIdOpt = impIdOpt)
}
val weights = (jsValue \ "weights").asOpt[Seq[Double]].getOrElse(queries.map(_ => 1.0))
- MultiQuery(queries = queries, weights = weights,
- queryOption = toQueryOption(jsValue), jsonQuery = jsValue)
+ MultiQuery(queries = queries, weights = weights, queryOption = toQueryOption(jsValue, impIdOpt))
}
- def toQueryOption(jsValue: JsValue): QueryOption = {
+ def toQueryOption(jsValue: JsValue, impIdOpt: Option[String]): QueryOption = {
val filterOutFields = (jsValue \ "filterOutFields").asOpt[List[String]].getOrElse(List(LabelMeta.to.name))
- val filterOutQuery = (jsValue \ "filterOut").asOpt[JsValue].map { v => toQuery(v) }.map { q =>
+ val filterOutQuery = (jsValue \ "filterOut").asOpt[JsValue].map { v => toQuery(v, impIdOpt = impIdOpt) }.map { q =>
q.copy(queryOption = q.queryOption.copy(filterOutFields = filterOutFields))
}
val removeCycle = (jsValue \ "removeCycle").asOpt[Boolean].getOrElse(true)
val selectColumns = (jsValue \ "select").asOpt[List[String]].getOrElse(List.empty)
- val groupByColumns = (jsValue \ "groupBy").asOpt[List[String]].getOrElse(List.empty)
+// val groupByColumns = (jsValue \ "groupBy").asOpt[List[String]].getOrElse(List.empty)
+ val groupBy = (jsValue \ "groupBy").asOpt[JsValue].getOrElse(JsNull) match {
+ case obj: JsObject =>
+ val keys = (obj \ "key").asOpt[Seq[String]].getOrElse(Nil)
+ val groupByLimit = (obj \ "limit").asOpt[Int].getOrElse(hardLimit)
+ GroupBy(keys, groupByLimit)
+ case arr: JsArray =>
+ val keys = arr.asOpt[Seq[String]].getOrElse(Nil)
+ GroupBy(keys)
+ case _ => GroupBy.Empty
+ }
val orderByColumns: List[(String, Boolean)] = (jsValue \ "orderBy").asOpt[List[JsObject]].map { jsLs =>
for {
js <- jsLs
@@ -238,7 +277,7 @@ class RequestParser(config: Config) {
QueryOption(removeCycle = removeCycle,
selectColumns = selectColumns,
- groupByColumns = groupByColumns,
+ groupBy = groupBy,
orderByColumns = orderByColumns,
filterOutQuery = filterOutQuery,
filterOutFields = filterOutFields,
@@ -247,10 +286,12 @@ class RequestParser(config: Config) {
limitOpt = limitOpt,
returnAgg = returnAgg,
scoreThreshold = scoreThreshold,
- returnDegree = returnDegree
+ returnDegree = returnDegree,
+ impIdOpt = impIdOpt
)
}
- def toQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): Query = {
+
+ def toQuery(jsValue: JsValue, impIdOpt: Option[String]): Query = {
try {
val vertices =
(for {
@@ -274,7 +315,7 @@ class RequestParser(config: Config) {
if (vertices.isEmpty) throw BadQueryException("srcVertices`s id is empty")
val steps = parse[Vector[JsValue]](jsValue, "steps")
- val queryOption = toQueryOption(jsValue)
+ val queryOption = toQueryOption(jsValue, impIdOpt)
val querySteps =
steps.zipWithIndex.map { case (step, stepIdx) =>
@@ -332,7 +373,7 @@ class RequestParser(config: Config) {
}
- val ret = Query(vertices, querySteps, queryOption, jsValue)
+ val ret = Query(vertices, querySteps, queryOption)
// logger.debug(ret.toString)
ret
} catch {
@@ -354,10 +395,8 @@ class RequestParser(config: Config) {
val limit = {
parseOption[Int](labelGroup, "limit") match {
case None => defaultLimit
- case Some(l) if l < 0 => maxLimit
- case Some(l) if l >= 0 =>
- val default = hardLimit
- Math.min(l, default)
+ case Some(l) if l < 0 => hardLimit
+ case Some(l) if l >= 0 => Math.min(l, hardLimit)
}
}
val offset = parseOption[Int](labelGroup, "offset").getOrElse(0)
@@ -405,7 +444,6 @@ class RequestParser(config: Config) {
// FIXME: Order of command matter
QueryParam(labelWithDir)
.sample(sample)
- .limit(offset, limit)
.rank(RankParam(label.id.get, scoring))
.exclude(exclude)
.include(include)
@@ -413,6 +451,7 @@ class RequestParser(config: Config) {
.has(hasFilter)
.labelOrderSeq(indexSeq)
.interval(interval)
+ .limit(offset, limit)
.where(where)
.duplicatePolicy(duplicate)
.includeDegree(includeDegree)
@@ -450,21 +489,8 @@ class RequestParser(config: Config) {
}
}
- def toJsValues(jsValue: JsValue): List[JsValue] = {
- jsValue match {
- case obj: JsObject => List(obj)
- case arr: JsArray => arr.as[List[JsValue]]
- case _ => List.empty[JsValue]
- }
- }
-
- def jsToStr(js: JsValue): String = js match {
- case JsString(s) => s
- case _ => js.toString()
- }
-
def parseBulkFormat(str: String): Seq[(GraphElement, String)] = {
- val edgeStrs = str.split("\\n")
+ val edgeStrs = str.split("\\n").filterNot(_.isEmpty)
val elementsWithTsv = for {
edgeStr <- edgeStrs
str <- GraphUtil.parseString(edgeStr)
@@ -480,24 +506,23 @@ class RequestParser(config: Config) {
}
private def toEdgeWithTsv(jsValue: JsValue, operation: String): Seq[(Edge, String)] = {
- val srcId = (jsValue \ "from").asOpt[JsValue].map(jsToStr)
- val tgtId = (jsValue \ "to").asOpt[JsValue].map(jsToStr)
- val srcIds = (jsValue \ "froms").asOpt[List[JsValue]].toList.flatMap(froms => froms.map(jsToStr)) ++ srcId
- val tgtIds = (jsValue \ "tos").asOpt[List[JsValue]].toList.flatMap(tos => tos.map(jsToStr)) ++ tgtId
+ val srcIds = (jsValue \ "from").asOpt[JsValue].map(Seq(_)).getOrElse(Nil) ++ (jsValue \ "froms").asOpt[Seq[JsValue]].getOrElse(Nil)
+ val tgtIds = (jsValue \ "to").asOpt[JsValue].map(Seq(_)).getOrElse(Nil) ++ (jsValue \ "tos").asOpt[Seq[JsValue]].getOrElse(Nil)
val label = parse[String](jsValue, "label")
val timestamp = parse[Long](jsValue, "timestamp")
- val direction = parseOption[String](jsValue, "direction").getOrElse("")
- val props = (jsValue \ "props").asOpt[JsValue].getOrElse("{}")
+ val direction = parseOption[String](jsValue, "direction").getOrElse("out")
+ val propsJson = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj())
for {
- srcId <- srcIds
- tgtId <- tgtIds
+ srcId <- srcIds.flatMap(jsValueToAny(_).toSeq)
+ tgtId <- tgtIds.flatMap(jsValueToAny(_).toSeq)
} yield {
- val edge = Management.toEdge(timestamp, operation, srcId, tgtId, label, direction, props.toString)
+ // val edge = Management.toEdge(graph, timestamp, operation, srcId, tgtId, label, direction, fromJsonToProperties(propsJson))
+ val edge = Edge.toEdge(srcId, tgtId, label, direction, fromJsonToProperties(propsJson), ts = timestamp, operation = operation)
val tsv = (jsValue \ "direction").asOpt[String] match {
- case None => Seq(timestamp, operation, "e", srcId, tgtId, label, props).mkString("\t")
- case Some(dir) => Seq(timestamp, operation, "e", srcId, tgtId, label, props, dir).mkString("\t")
+ case None => Seq(timestamp, operation, "e", srcId, tgtId, label, propsJson.toString).mkString("\t")
+ case Some(dir) => Seq(timestamp, operation, "e", srcId, tgtId, label, propsJson.toString, dir).mkString("\t")
}
(edge, tsv)
@@ -513,8 +538,8 @@ class RequestParser(config: Config) {
val ts = parseOption[Long](jsValue, "timestamp").getOrElse(System.currentTimeMillis())
val sName = if (serviceName.isEmpty) parse[String](jsValue, "serviceName") else serviceName.get
val cName = if (columnName.isEmpty) parse[String](jsValue, "columnName") else columnName.get
- val props = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj())
- Management.toVertex(ts, operation, id.toString, sName, cName, props.toString)
+ val props = fromJsonToProperties((jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj()))
+ Vertex.toVertex(sName, cName, id.toString, props, ts, operation)
}
def toPropElements(jsObj: JsValue) = Try {
@@ -637,7 +662,7 @@ class RequestParser(config: Config) {
def toDeleteParam(json: JsValue) = {
val labelName = (json \ "label").as[String]
- val labels = Label.findByName(labelName).map { l => Seq(l) }.getOrElse(Nil).filterNot(_.isAsync)
+ val labels = Label.findByName(labelName).map { l => Seq(l) }.getOrElse(Nil)
val direction = (json \ "direction").asOpt[String].getOrElse("out")
val ids = (json \ "ids").asOpt[List[JsValue]].getOrElse(Nil)
@@ -645,4 +670,30 @@ class RequestParser(config: Config) {
val vertices = toVertices(labelName, direction, ids)
(labels, direction, ids, ts, vertices)
}
+
+ def toFetchAndDeleteParam(json: JsValue) = {
+ val labelName = (json \ "label").as[String]
+ val fromOpt = (json \ "from").asOpt[JsValue]
+ val toOpt = (json \ "to").asOpt[JsValue]
+ val direction = (json \ "direction").asOpt[String].getOrElse("out")
+ val indexOpt = (json \ "index").asOpt[String]
+ val propsOpt = (json \ "props").asOpt[JsObject]
+ (labelName, fromOpt, toOpt, direction, indexOpt, propsOpt)
+ }
+
+ def parseExperiment(jsQuery: JsValue): Seq[ExperimentParam] = jsQuery.as[Seq[JsObject]].map { obj =>
+ def _require(field: String) = throw new RuntimeException(s"${field} not found")
+
+ val accessToken = (obj \ "accessToken").asOpt[String].getOrElse(_require("accessToken"))
+ val experimentName = (obj \ "experiment").asOpt[String].getOrElse(_require("experiment"))
+ val uuid = (obj \ "#uuid").get match {
+ case JsString(s) => s
+ case JsNumber(n) => n.toString
+ case _ => _require("#uuid")
+ }
+ val body = (obj \ "params").asOpt[JsObject].getOrElse(Json.obj())
+ val impKeyOpt = (obj \ Experiment.ImpressionKey).asOpt[String]
+
+ (body, accessToken, experimentName, uuid, impKeyOpt)
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
index 55b3e79..4c77ad6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
@@ -21,7 +21,8 @@ package org.apache.s2graph.core.rest
import java.net.URL
-import org.apache.s2graph.core.GraphExceptions.BadQueryException
+import org.apache.s2graph.core.GraphExceptions.{BadQueryException, LabelNotExistException}
+import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core._
import org.apache.s2graph.core.mysqls.{Bucket, Experiment, Service}
import org.apache.s2graph.core.utils.logger
@@ -29,8 +30,21 @@ import play.api.libs.json._
import scala.concurrent.{ExecutionContext, Future}
-
object RestHandler {
+ trait CanLookup[A] {
+ def lookup(m: A, key: String): Option[String]
+ }
+
+ object CanLookup {
+ implicit val oneTupleLookup = new CanLookup[(String, String)] {
+ override def lookup(m: (String, String), key: String) =
+ if (m._1 == key) Option(m._2) else None
+ }
+ implicit val hashMapLookup = new CanLookup[Map[String, String]] {
+ override def lookup(m: Map[String, String], key: String): Option[String] = m.get(key)
+ }
+ }
+
case class HandlerResult(body: Future[JsValue], headers: (String, String)*)
}
@@ -41,25 +55,31 @@ object RestHandler {
class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
import RestHandler._
- val requestParser = new RequestParser(graph.config)
+ val requestParser = new RequestParser(graph)
+
/**
* Public APIS
*/
- def doPost(uri: String, body: String, impKeyOpt: => Option[String] = None): HandlerResult = {
+ def doPost[A](uri: String, body: String, headers: A)(implicit ev: CanLookup[A]): HandlerResult = {
+ val impKeyOpt = ev.lookup(headers, Experiment.ImpressionKey)
+ val impIdOpt = ev.lookup(headers, Experiment.ImpressionId)
+
try {
val jsQuery = Json.parse(body)
uri match {
- case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.toSimpleVertexArrJson))
- case "/graphs/getEdges/grouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithListFormatted))
- case "/graphs/getEdgesExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.toSimpleVertexArrJson))
- case "/graphs/getEdgesExcluded/grouped" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
+// case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery, impIdOpt)(PostProcess.toSimpleVertexArrJson))
+ case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery, impIdOpt)(PostProcess.toJson))
+// case "/graphs/getEdges/grouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithListFormatted))
+// case "/graphs/getEdgesExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.toSimpleVertexArrJson))
+// case "/graphs/getEdgesExcluded/grouped" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
case "/graphs/checkEdges" => checkEdges(jsQuery)
- case "/graphs/getEdgesGrouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithList))
- case "/graphs/getEdgesGroupedExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExclude))
- case "/graphs/getEdgesGroupedExcludedFormatted" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
+// case "/graphs/getEdgesGrouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithList))
+// case "/graphs/getEdgesGroupedExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExclude))
+// case "/graphs/getEdgesGroupedExcludedFormatted" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
case "/graphs/getVertices" => HandlerResult(getVertices(jsQuery))
+ case "/graphs/experiments" => experiments(jsQuery)
case uri if uri.startsWith("/graphs/experiment") =>
val Array(accessToken, experimentName, uuid) = uri.split("/").takeRight(3)
experiment(jsQuery, accessToken, experimentName, uuid, impKeyOpt)
@@ -75,17 +95,8 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
try {
val (quads, isReverted) = requestParser.toCheckEdgeParam(jsValue)
- HandlerResult(graph.checkEdges(quads).map { case queryRequestWithResultLs =>
- val edgeJsons = for {
- queryRequestWithResult <- queryRequestWithResultLs
- (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
- edgeWithScore <- queryResult.edgeWithScoreLs
- (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
- convertedEdge = if (isReverted) edge.duplicateEdge else edge
- edgeJson = PostProcess.edgeToJson(convertedEdge, score, queryRequest.query, queryRequest.queryParam)
- } yield Json.toJson(edgeJson)
-
- Json.toJson(edgeJsons)
+ HandlerResult(graph.checkEdges(quads).map { case stepResult =>
+ PostProcess.toJson(graph, QueryOption(), stepResult)
})
} catch {
case e: Exception => HandlerResult(Future.failed(e))
@@ -93,11 +104,23 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
}
- /**
- * Private APIS
- */
- private def experiment(contentsBody: JsValue, accessToken: String, experimentName: String, uuid: String, impKeyOpt: => Option[String]): HandlerResult = {
+ private def experiments(jsQuery: JsValue): HandlerResult = {
+ val params: Seq[RequestParser.ExperimentParam] = requestParser.parseExperiment(jsQuery)
+
+ val results = params map { case (body, token, experimentName, uuid, impKeyOpt) =>
+ val handlerResult = experiment(body, token, experimentName, uuid, impKeyOpt)
+ val future = handlerResult.body.recover {
+ case e: Exception => PostProcess.emptyResults ++ Json.obj("error" -> Json.obj("reason" -> e.getMessage))
+ }
+
+ future
+ }
+
+ val result = Future.sequence(results).map(JsArray)
+ HandlerResult(body = result)
+ }
+ private def experiment(contentsBody: JsValue, accessToken: String, experimentName: String, uuid: String, impKeyOpt: => Option[String] = None): HandlerResult = {
try {
val bucketOpt = for {
service <- Service.findByAccessToken(accessToken)
@@ -108,7 +131,7 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
val bucket = bucketOpt.getOrElse(throw new RuntimeException("bucket is not found"))
if (bucket.isGraphQuery) {
val ret = buildRequestInner(contentsBody, bucket, uuid)
- HandlerResult(ret.body, Experiment.impressionKey -> bucket.impressionId)
+ HandlerResult(ret.body, Experiment.ImpressionKey -> bucket.impressionId)
}
else throw new RuntimeException("not supported yet")
} catch {
@@ -127,119 +150,54 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
val experimentLog = s"POST $path took -1 ms 200 -1 $body"
logger.debug(experimentLog)
- doPost(path, body)
- }
- }
-
- private def eachQuery(post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue)(q: Query): Future[JsValue] = {
- val filterOutQueryResultsLs = q.filterOutQuery match {
- case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
- case None => Future.successful(Seq.empty)
- }
-
- for {
- queryResultsLs <- graph.getEdges(q)
- filterOutResultsLs <- filterOutQueryResultsLs
- } yield {
- val json = post(queryResultsLs, filterOutResultsLs)
- json
+ doPost(path, body, Experiment.ImpressionId -> bucket.impressionId)
}
}
- def getEdgesAsync(jsonQuery: JsValue)
- (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
-
- val fetch = eachQuery(post) _
+ def getEdgesAsync(jsonQuery: JsValue, impIdOpt: Option[String] = None)
+ (post: (Graph, QueryOption, StepResult) => JsValue): Future[JsValue] = {
jsonQuery match {
- case JsArray(arr) => Future.traverse(arr.map(requestParser.toQuery(_)))(fetch).map(JsArray)
case obj@JsObject(_) =>
(obj \ "queries").asOpt[JsValue] match {
- case None => fetch(requestParser.toQuery(obj))
+ case None =>
+ val query = requestParser.toQuery(obj, impIdOpt)
+ graph.getEdges(query).map(post(graph, query.queryOption, _))
case _ =>
- val multiQuery = requestParser.toMultiQuery(obj)
- val filterOutFuture = multiQuery.queryOption.filterOutQuery match {
- case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
- case None => Future.successful(Seq.empty)
- }
- val futures = multiQuery.queries.zip(multiQuery.weights).map { case (query, weight) =>
- val filterOutQueryResultsLs = query.queryOption.filterOutQuery match {
- case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
- case None => Future.successful(Seq.empty)
- }
- for {
- queryRequestWithResultLs <- graph.getEdges(query)
- filterOutResultsLs <- filterOutQueryResultsLs
- } yield {
- val newQueryRequestWithResult = for {
- queryRequestWithResult <- queryRequestWithResultLs
- queryResult = queryRequestWithResult.queryResult
- } yield {
- val newEdgesWithScores = for {
- edgeWithScore <- queryRequestWithResult.queryResult.edgeWithScoreLs
- } yield {
- edgeWithScore.copy(score = edgeWithScore.score * weight)
- }
- queryRequestWithResult.copy(queryResult = queryResult.copy(edgeWithScoreLs = newEdgesWithScores))
- }
- logger.debug(s"[Size]: ${newQueryRequestWithResult.map(_.queryResult.edgeWithScoreLs.size).sum}")
- (newQueryRequestWithResult, filterOutResultsLs)
- }
- }
- for {
- filterOut <- filterOutFuture
- resultWithExcludeLs <- Future.sequence(futures)
- } yield {
- PostProcess.toSimpleVertexArrJsonMulti(multiQuery.queryOption, resultWithExcludeLs, filterOut)
- // val initial = (ListBuffer.empty[QueryRequestWithResult], ListBuffer.empty[QueryRequestWithResult])
- // val (results, excludes) = resultWithExcludeLs.foldLeft(initial) { case ((prevResults, prevExcludes), (results, excludes)) =>
- // (prevResults ++= results, prevExcludes ++= excludes)
- // }
- // PostProcess.toSimpleVertexArrJson(multiQuery.queryOption, results, excludes ++ filterOut)
- }
+ val multiQuery = requestParser.toMultiQuery(obj, impIdOpt)
+ graph.getEdgesMultiQuery(multiQuery).map(post(graph, multiQuery.queryOption, _))
}
- case _ => throw BadQueryException("Cannot support")
- }
- }
-
- private def getEdgesExcludedAsync(jsonQuery: JsValue)
- (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
- val q = requestParser.toQuery(jsonQuery)
- val filterOutQuery = Query(q.vertices, Vector(q.steps.last))
- val fetchFuture = graph.getEdges(q)
- val excludeFuture = graph.getEdges(filterOutQuery)
+ case JsArray(arr) =>
+ val queries = arr.map(requestParser.toQuery(_, impIdOpt))
+ val weights = queries.map(_ => 1.0)
+ val multiQuery = MultiQuery(queries, weights, QueryOption(), jsonQuery)
+ graph.getEdgesMultiQuery(multiQuery).map(post(graph, multiQuery.queryOption, _))
- for {
- queryResultLs <- fetchFuture
- exclude <- excludeFuture
- } yield {
- post(queryResultLs, exclude)
+ case _ => throw BadQueryException("Cannot support")
}
}
private def getVertices(jsValue: JsValue) = {
val jsonQuery = jsValue
- val ts = System.currentTimeMillis()
- val props = "{}"
val vertices = jsonQuery.as[List[JsValue]].flatMap { js =>
val serviceName = (js \ "serviceName").as[String]
val columnName = (js \ "columnName").as[String]
- for (id <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])) yield {
- Management.toVertex(ts, "insert", id.toString, serviceName, columnName, props)
+ for {
+ idJson <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])
+ id <- jsValueToAny(idJson)
+ } yield {
+ Vertex.toVertex(serviceName, columnName, id)
}
}
graph.getVertices(vertices) map { vertices => PostProcess.verticesToJson(vertices) }
}
+
private def buildRequestBody(requestKeyJsonOpt: Option[JsValue], bucket: Bucket, uuid: String): String = {
var body = bucket.requestBody.replace("#uuid", uuid)
- // // replace variable
- // body = TemplateHelper.replaceVariable(System.currentTimeMillis(), body)
-
- // replace param
for {
requestKeyJson <- requestKeyJsonOpt
jsObj <- requestKeyJson.asOpt[JsObject]
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index eaa25af..a6e81b4 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -19,12 +19,10 @@
package org.apache.s2graph.core.storage
-import java.util.concurrent.{TimeUnit, Executors}
+import java.util.concurrent.{Executors, TimeUnit}
import com.typesafe.config.Config
import org.apache.hadoop.hbase.util.Bytes
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.s2graph.core.ExceptionHandler.{KafkaMessage, Key, Val}
import org.apache.s2graph.core.GraphExceptions.FetchTimeoutException
import org.apache.s2graph.core._
import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
@@ -37,13 +35,15 @@ import org.apache.s2graph.core.utils.{Extensions, logger}
import scala.annotation.tailrec
import scala.collection.Seq
import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{Promise, ExecutionContext, Future}
+import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Random, Try}
-abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
+abstract class Storage[R](val graph: Graph,
+ val config: Config)(implicit ec: ExecutionContext) {
import HBaseType._
/** storage dependent configurations */
+ val DeleteAllFetchCount = config.getInt("delete.all.fetch.count")
val MaxRetryNum = config.getInt("max.retry.number")
val MaxBackOff = config.getInt("max.back.off")
val BackoffTimeout = config.getInt("back.off.timeout")
@@ -57,11 +57,15 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
/** retry scheduler */
val scheduledThreadPool = Executors.newSingleThreadScheduledExecutor()
+
/** handle mutate failed */
val exceptionHandler = new ExceptionHandler(config)
-
val failTopic = s"mutateFailed_${config.getString("phase")}"
+ /** fallback */
+ val fallback = Future.successful(StepResult.Empty)
+ val innerFallback = Future.successful(StepInnerResult.Empty)
+
/**
* Compatibility table
* | label schema version | snapshot edge | index edge | vertex | note |
@@ -229,7 +233,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
* @return
*/
def fetches(queryRequestWithScoreLs: Seq[(QueryRequest, Double)],
- prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[QueryRequestWithResult]]
+ prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepInnerResult]]
/**
* fetch Vertex for given request from storage.
@@ -324,7 +328,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
def mutateEdges(edges: Seq[Edge], withWait: Boolean): Future[Seq[Boolean]] = {
val (strongEdges, weakEdges) =
- edges.partition(e => e.label.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk"))
+ (edges.partition(e => e.label.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk")))
val weakEdgesFutures = weakEdges.groupBy { e => e.label.hbaseZkAddr }.map { case (zkQuorum, edges) =>
val mutations = edges.flatMap { edge =>
@@ -449,7 +453,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
future recoverWith {
case FetchTimeoutException(retryEdge) =>
logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}")
- /* fetch failed. re-fetch should be done */
+ /** fetch failed. re-fetch should be done */
fetchSnapshotEdge(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
}
@@ -465,14 +469,14 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
}
logger.info(s"[Try: $tryNum], [Status: $status] partial fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}")
- /* retry logic */
+ /** retry logic */
val promise = Promise[Boolean]
val backOff = exponentialBackOff(tryNum)
scheduledThreadPool.schedule(new Runnable {
override def run(): Unit = {
val future = if (failedStatusCode == 0) {
// acquire Lock failed. other is mutating so this thead need to re-fetch snapshotEdge.
- /* fetch failed. re-fetch should be done */
+ /** fetch failed. re-fetch should be done */
fetchSnapshotEdge(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
}
@@ -505,7 +509,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
case 0 =>
fetchedSnapshotEdgeOpt match {
case None =>
- /*
+ /**
* no one has never mutated this SN.
* (squashedEdge, edgeMutate) = Edge.buildOperation(None, edges)
* pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = squashedEdge.ts + 1)
@@ -527,7 +531,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
case Some(snapshotEdge) =>
snapshotEdge.pendingEdgeOpt match {
case None =>
- /*
+ /**
* others finished commit on this SN. but there is no contention.
* (squashedEdge, edgeMutate) = Edge.buildOperation(snapshotEdgeOpt, edges)
* pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = snapshotEdge.version + 1) ?
@@ -549,7 +553,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
case Some(pendingEdge) =>
val isLockExpired = pendingEdge.lockTs.get + LockExpireDuration < System.currentTimeMillis()
if (isLockExpired) {
- /*
+ /**
* if pendingEdge.ts == snapshotEdge.ts =>
* (squashedEdge, edgeMutate) = Edge.buildOperation(None, Seq(pendingEdge))
* else =>
@@ -571,7 +575,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
} else {
- /*
+ /**
* others finished commit on this SN and there is currently contention.
* this can't be proceed so retry from re-fetch.
* throw EX
@@ -584,11 +588,11 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
}
case _ =>
- /*
+ /**
* statusCode > 0 which means self locked and there has been partial failure either on mutate, increment, releaseLock
*/
- /*
+ /**
* this succeed to lock this SN. keep doing on commit process.
* if SN.isEmpty =>
* no one never succed to commit on this SN.
@@ -828,90 +832,96 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
/** Delete All */
- protected def deleteAllFetchedEdgesAsyncOld(queryRequest: QueryRequest,
- queryResult: QueryResult,
+ protected def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepInnerResult,
requestTs: Long,
retryNum: Int): Future[Boolean] = {
- val queryParam = queryRequest.queryParam
- val zkQuorum = queryParam.label.hbaseZkAddr
- val futures = for {
- edgeWithScore <- queryResult.edgeWithScoreLs
- (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
- } yield {
- /* reverted direction */
- val reversedIndexedEdgesMutations = edge.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
- indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
- buildIncrementsAsync(indexEdge, -1L)
- }
- val reversedSnapshotEdgeMutations = snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
- val forwardIndexedEdgeMutations = edge.edgesWithIndex.flatMap { indexEdge =>
- indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
- buildIncrementsAsync(indexEdge, -1L)
+ if (stepInnerResult.isEmpty) Future.successful(true)
+ else {
+ val head = stepInnerResult.edgesWithScoreLs.head
+ val zkQuorum = head.edge.label.hbaseZkAddr
+ val futures = for {
+ edgeWithScore <- stepInnerResult.edgesWithScoreLs
+ (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+ } yield {
+ /** reverted direction */
+ val reversedIndexedEdgesMutations = edge.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
+ indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
+ buildIncrementsAsync(indexEdge, -1L)
+ }
+ val reversedSnapshotEdgeMutations = snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
+ val forwardIndexedEdgeMutations = edge.edgesWithIndex.flatMap { indexEdge =>
+ indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
+ buildIncrementsAsync(indexEdge, -1L)
+ }
+ val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
+ writeToStorage(zkQuorum, mutations, withWait = true)
}
- val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
- writeToStorage(zkQuorum, mutations, withWait = true)
- }
- Future.sequence(futures).map { rets => rets.forall(identity) }
+ Future.sequence(futures).map { rets => rets.forall(identity) }
+ }
}
- protected def buildEdgesToDelete(queryRequestWithResultLs: QueryRequestWithResult, requestTs: Long): QueryResult = {
- val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResultLs).get
- val edgeWithScoreLs = queryResult.edgeWithScoreLs.filter { edgeWithScore =>
+ protected def buildEdgesToDelete(stepInnerResult: StepInnerResult, requestTs: Long): StepInnerResult = {
+ val filtered = stepInnerResult.edgesWithScoreLs.filter { edgeWithScore =>
(edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree
- }.map { edgeWithScore =>
- val label = queryRequest.queryParam.label
- val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match {
- case "strong" =>
- val _newPropsWithTs = edgeWithScore.edge.propsWithTs ++
- Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion))
- (GraphUtil.operations("delete"), requestTs, _newPropsWithTs)
- case _ =>
- val oldEdge = edgeWithScore.edge
- (oldEdge.op, oldEdge.version, oldEdge.propsWithTs)
- }
+ }
+ if (filtered.isEmpty) StepInnerResult.Empty
+ else {
+ val head = filtered.head
+ val label = head.edge.label
+ val edgeWithScoreLs = filtered.map { edgeWithScore =>
+ val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match {
+ case "strong" =>
+ val _newPropsWithTs = edgeWithScore.edge.propsWithTs ++
+ Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion))
+ (GraphUtil.operations("delete"), requestTs, _newPropsWithTs)
+ case _ =>
+ val oldEdge = edgeWithScore.edge
+ (oldEdge.op, oldEdge.version, oldEdge.propsWithTs)
+ }
- val copiedEdge =
- edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs)
+ val copiedEdge =
+ edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs)
- val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
-// logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}")
- edgeToDelete
+ val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
+ // logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}")
+ edgeToDelete
+ }
+ //Degree edge?
+ StepInnerResult(edgeWithScoreLs, Nil, false)
}
-
- queryResult.copy(edgeWithScoreLs = edgeWithScoreLs)
}
- protected def deleteAllFetchedEdgesLs(queryRequestWithResultLs: Seq[QueryRequestWithResult], requestTs: Long): Future[(Boolean, Boolean)] = {
- val queryResultLs = queryRequestWithResultLs.map(_.queryResult)
- queryResultLs.foreach { queryResult =>
- if (queryResult.isFailure) throw new RuntimeException("fetched result is fallback.")
+ protected def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepInnerResult],
+ requestTs: Long): Future[(Boolean, Boolean)] = {
+ stepInnerResultLs.foreach { stepInnerResult =>
+ if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.")
}
val futures = for {
- queryRequestWithResult <- queryRequestWithResultLs
- (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get
- deleteQueryResult = buildEdgesToDelete(queryRequestWithResult, requestTs)
- if deleteQueryResult.edgeWithScoreLs.nonEmpty
+ stepInnerResult <- stepInnerResultLs
+ deleteStepInnerResult = buildEdgesToDelete(stepInnerResult, requestTs)
+ if deleteStepInnerResult.edgesWithScoreLs.nonEmpty
} yield {
- val label = queryRequest.queryParam.label
+ val head = deleteStepInnerResult.edgesWithScoreLs.head
+ val label = head.edge.label
label.schemaVersion match {
case HBaseType.VERSION3 | HBaseType.VERSION4 =>
if (label.consistencyLevel == "strong") {
- /*
+ /**
* read: snapshotEdge on queryResult = O(N)
* write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge))
*/
- mutateEdges(deleteQueryResult.edgeWithScoreLs.map(_.edge), withWait = true).map(_.forall(identity))
+ mutateEdges(deleteStepInnerResult.edgesWithScoreLs.map(_.edge), withWait = true).map(_.forall(identity))
} else {
- deleteAllFetchedEdgesAsyncOld(queryRequest, deleteQueryResult, requestTs, MaxRetryNum)
+ deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum)
}
case _ =>
- /*
+ /**
* read: x
* write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices)
*/
- deleteAllFetchedEdgesAsyncOld(queryRequest, deleteQueryResult, requestTs, MaxRetryNum)
+ deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum)
}
}
@@ -923,10 +933,10 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
}
}
- protected def fetchAndDeleteAll(query: Query, requestTs: Long): Future[(Boolean, Boolean)] = {
+ protected def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = {
val future = for {
- queryRequestWithResultLs <- getEdges(query)
- (allDeleted, ret) <- deleteAllFetchedEdgesLs(queryRequestWithResultLs, requestTs)
+ stepInnerResultLs <- Future.sequence(queries.map(getEdgesStepInner(_)))
+ (allDeleted, ret) <- deleteAllFetchedEdgesLs(stepInnerResultLs, requestTs)
} yield {
// logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}")
(allDeleted, ret)
@@ -960,19 +970,19 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
}
val requestTs = ts
- val queryParams = for {
+ /** create query per label */
+ val queries = for {
label <- labels
} yield {
val labelWithDir = LabelWithDirection(label.id.get, dir)
- QueryParam(labelWithDir).limit(0, DeleteAllFetchSize).duplicatePolicy(Option(Query.DuplicatePolicy.Raw))
+ val queryParam = QueryParam(labelWithDir).limit(0, DeleteAllFetchSize).duplicatePolicy(Option(Query.DuplicatePolicy.Raw))
+ val step = Step(List(queryParam))
+ Query(srcVertices, Vector(step))
}
- val step = Step(queryParams.toList)
- val q = Query(srcVertices, Vector(step))
-
// Extensions.retryOnSuccessWithBackoff(MaxRetryNum, Random.nextInt(MaxBackOff) + 1) {
- val retryFuture = Extensions.retryOnSuccess(MaxRetryNum) {
- fetchAndDeleteAll(q, requestTs)
+ val retryFuture = Extensions.retryOnSuccess(DeleteAllFetchCount) {
+ fetchAndDeleteAll(queries, requestTs)
} { case (allDeleted, deleteSuccess) =>
allDeleted && deleteSuccess
}.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess }
@@ -1039,7 +1049,9 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
queryParam: QueryParam,
prevScore: Double = 1.0,
isInnerCall: Boolean,
- parentEdges: Seq[EdgeWithScore]): Seq[EdgeWithScore] = {
+ parentEdges: Seq[EdgeWithScore],
+ startOffset: Int = 0,
+ len: Int = Int.MaxValue): Seq[EdgeWithScore] = {
if (kvs.isEmpty) Seq.empty
else {
val first = kvs.head
@@ -1050,7 +1062,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
else indexEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, None)
for {
- kv <- kvs
+ (kv, idx) <- kvs.zipWithIndex if idx >= startOffset && idx < startOffset + len
edge <-
if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryParam, None, isInnerCall, parentEdges)
else toEdge(kv, queryParam, cacheElementOpt, parentEdges)
@@ -1071,19 +1083,6 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
/** End Of Parse Logic */
-// /** methods for consistency */
-// protected def writeAsyncSimple(zkQuorum: String, elementRpcs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] = {
-// if (elementRpcs.isEmpty) {
-// Future.successful(true)
-// } else {
-// val futures = elementRpcs.map { rpc => writeToStorage(rpc, withWait) }
-// Future.sequence(futures).map(_.forall(identity))
-// }
-// }
-
-
- // def futureCache[T] = Cache[Long, (Long, T)]
-
protected def toRequestEdge(queryRequest: QueryRequest): Edge = {
val srcVertex = queryRequest.vertex
// val tgtVertexOpt = queryRequest.tgtVertexOpt
@@ -1095,7 +1094,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir)
val (srcInnerId, tgtInnerId) = tgtVertexIdOpt match {
case Some(tgtVertexId) => // _to is given.
- /* we use toSnapshotEdge so dont need to swap src, tgt */
+ /** we use toSnapshotEdge so dont need to swap src, tgt */
val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion)
val tgt = InnerVal.convertVersion(tgtVertexId, tgtColumn.columnType, label.schemaVersion)
(src, tgt)
@@ -1135,27 +1134,26 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
}
}
- protected def fetchStep(orgQuery: Query, queryRequestWithResultsLs: Seq[QueryRequestWithResult]): Future[Seq[QueryRequestWithResult]] = {
- if (queryRequestWithResultsLs.isEmpty) Future.successful(Nil)
+ protected def fetchStep(orgQuery: Query,
+ stepIdx: Int,
+ stepInnerResult: StepInnerResult): Future[StepInnerResult] = {
+ if (stepInnerResult.isEmpty) Future.successful(StepInnerResult.Empty)
else {
- val queryRequest = queryRequestWithResultsLs.head.queryRequest
- val q = orgQuery
- val queryResultsLs = queryRequestWithResultsLs.map(_.queryResult)
+ val edgeWithScoreLs = stepInnerResult.edgesWithScoreLs
- val stepIdx = queryRequest.stepIdx + 1
+ val q = orgQuery
val prevStepOpt = if (stepIdx > 0) Option(q.steps(stepIdx - 1)) else None
val prevStepThreshold = prevStepOpt.map(_.nextStepScoreThreshold).getOrElse(QueryParam.DefaultThreshold)
val prevStepLimit = prevStepOpt.map(_.nextStepLimit).getOrElse(-1)
val step = q.steps(stepIdx)
+
val alreadyVisited =
if (stepIdx == 0) Map.empty[(LabelWithDirection, Vertex), Boolean]
- else Graph.alreadyVisitedVertices(queryResultsLs)
+ else Graph.alreadyVisitedVertices(stepInnerResult.edgesWithScoreLs)
- val groupedBy = queryResultsLs.flatMap { queryResult =>
- queryResult.edgeWithScoreLs.map { case edgeWithScore =>
- edgeWithScore.edge.tgtVertex -> edgeWithScore
- }
+ val groupedBy = edgeWithScoreLs.map { case edgeWithScore =>
+ edgeWithScore.edge.tgtVertex -> edgeWithScore
}.groupBy { case (vertex, edgeWithScore) => vertex }
val groupedByFiltered = for {
@@ -1178,39 +1176,48 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
queryParam <- step.queryParams
} yield (QueryRequest(q, stepIdx, vertex, queryParam), prevStepScore)
- Graph.filterEdges(fetches(queryRequests, prevStepTgtVertexIdEdges), alreadyVisited)(ec)
+ val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges)
+ Graph.filterEdges(orgQuery, stepIdx, queryRequests.map(_._1), fetchedLs, orgQuery.steps(stepIdx).queryParams, alreadyVisited)(ec)
}
}
-
- protected def fetchStepFuture(orgQuery: Query, queryRequestWithResultLsFuture: Future[Seq[QueryRequestWithResult]]): Future[Seq[QueryRequestWithResult]] = {
- for {
- queryRequestWithResultLs <- queryRequestWithResultLsFuture
- ret <- fetchStep(orgQuery, queryRequestWithResultLs)
- } yield ret
+ private def getEdgesStepInner(q: Query): Future[StepInnerResult] = {
+ Try {
+ if (q.steps.isEmpty) innerFallback
+ else {
+ // current stepIdx = -1
+ val startStepInnerResult = QueryResult.fromVertices(q)
+ q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) =>
+ for {
+ prevStepInnerResult <- prevStepInnerResultFuture
+ currentStepInnerResult <- fetchStep(q, stepIdx, prevStepInnerResult)
+ } yield currentStepInnerResult
+ }
+ }
+ } recover {
+ case e: Exception =>
+ logger.error(s"getEdgesAsync: $e", e)
+ innerFallback
+ } get
}
-
- def getEdges(q: Query): Future[Seq[QueryRequestWithResult]] = {
- val fallback = {
- val queryRequest = QueryRequest(query = q, stepIdx = 0, q.vertices.head, queryParam = QueryParam.Empty)
- Future.successful(q.vertices.map(v => QueryRequestWithResult(queryRequest, QueryResult())))
- }
+ def getEdges(q: Query): Future[StepResult] = {
Try {
-
if (q.steps.isEmpty) {
// TODO: this should be get vertex query.
fallback
} else {
- // current stepIdx = -1
- val startQueryResultLs = QueryResult.fromVertices(q)
- q.steps.foldLeft(Future.successful(startQueryResultLs)) { case (acc, step) =>
- fetchStepFuture(q, acc)
-// fetchStepFuture(q, acc).map { stepResults =>
-// step.queryParams.zip(stepResults).foreach { case (qParam, queryRequestWithResult) =>
-// val cursor = Base64.getEncoder.encodeToString(queryRequestWithResult.queryResult.tailCursor)
-// qParam.cursorOpt = Option(cursor)
-// }
-// stepResults
-// }
+ val filterOutFuture = q.queryOption.filterOutQuery match {
+ case None => innerFallback
+ case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery)
+ }
+ for {
+ innerResult <- getEdgesStepInner(q)
+ filterOutInnerResult <- filterOutFuture
+ } yield {
+ val result = StepResult(graph, q.queryOption, innerResult)
+ if (filterOutInnerResult.isEmpty) result
+ else {
+ StepResult.filterOut(graph, q.queryOption, result, filterOutInnerResult)
+ }
}
}
} recover {
@@ -1220,7 +1227,34 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
} get
}
- def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[Seq[QueryRequestWithResult]] = {
+ def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = {
+ val fallback = Future.successful(StepResult.Empty)
+
+ Try {
+ if (mq.queries.isEmpty) fallback
+ else {
+ val filterOutFuture = mq.queryOption.filterOutQuery match {
+ case None => innerFallback
+ case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery)
+ }
+
+ val multiQueryFutures = Future.sequence(mq.queries.map { query => getEdges(query) })
+ for {
+ multiQueryResults <- multiQueryFutures
+ filterOutInnerResult <- filterOutFuture
+ } yield {
+ val merged = StepResult.merges(mq.queryOption, multiQueryResults, mq.weights)
+ StepResult.filterOut(graph, mq.queryOption, merged, filterOutInnerResult)
+ }
+ }
+ } recover {
+ case e: Exception =>
+ logger.error(s"getEdgesAsync: $e", e)
+ fallback
+ } get
+ }
+
+ def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[StepResult] = {
val ts = System.currentTimeMillis()
val futures = for {
(srcVertex, tgtVertex, queryParam) <- params
@@ -1228,15 +1262,18 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
edge = Edge(srcVertex, tgtVertex, queryParam.labelWithDir, propsWithTs = propsWithTs)
} yield {
fetchSnapshotEdge(edge).map { case (queryParam, edgeOpt, kvOpt) =>
- val _queryParam = queryParam.tgtVertexInnerIdOpt(Option(edge.tgtVertex.innerId))
- val q = Query.toQuery(Seq(edge.srcVertex), _queryParam)
- val queryRequest = QueryRequest(q, 0, edge.srcVertex, _queryParam)
- val queryResult = QueryResult(edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0)))
- QueryRequestWithResult(queryRequest, queryResult)
+ edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0))
}
}
- Future.sequence(futures)
+ Future.sequence(futures).map { edgeWithScoreLs =>
+ val s2EdgeWithScoreLs = edgeWithScoreLs.flatMap { ls =>
+ ls.map { edgeWithScore =>
+ S2EdgeWithScore(edgeWithScore.edge, edgeWithScore.score)
+ }
+ }
+ StepResult(results = s2EdgeWithScoreLs, grouped = Nil, degreeEdges = Nil)
+ }
}
@@ -1266,6 +1303,13 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
}
}
+
+ protected def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = {
+ val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score }
+ edgeWithScores.map { edgeWithScore =>
+ edgeWithScore.copy(score = edgeWithScore.score / sum)
+ }
+ }
/** end of query */
/** Mutation Builder */
@@ -1290,19 +1334,19 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
(edgeMutate.edgesToDelete.isEmpty, edgeMutate.edgesToInsert.isEmpty) match {
case (true, true) =>
- /* when there is no need to update. shouldUpdate == false */
+ /** when there is no need to update. shouldUpdate == false */
List.empty
case (true, false) =>
- /* no edges to delete but there is new edges to insert so increase degree by 1 */
+ /** no edges to delete but there is new edges to insert so increase degree by 1 */
edgeMutate.edgesToInsert.flatMap { e => buildIncrementsAsync(e) }
case (false, true) =>
- /* no edges to insert but there is old edges to delete so decrease degree by 1 */
+ /** no edges to insert but there is old edges to delete so decrease degree by 1 */
edgeMutate.edgesToDelete.flatMap { e => buildIncrementsAsync(e, -1L) }
case (false, false) =>
- /* update on existing edges so no change on degree */
+ /** update on existing edges so no change on degree */
List.empty
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 138216b..b52ba53 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -82,8 +82,9 @@ object AsynchbaseStorage {
}
-class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionContext)
- extends Storage[Deferred[QueryRequestWithResult]](config) {
+class AsynchbaseStorage(override val graph: Graph,
+ override val config: Config)(implicit ec: ExecutionContext)
+ extends Storage[Deferred[StepInnerResult]](graph, config) {
import Extensions.DeferOps
@@ -100,14 +101,16 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
private val emptyKeyValues = new util.ArrayList[KeyValue]()
private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client
- import CanDefer._
-
/** Future Cache to squash request */
- private val futureCache = new DeferCache[QueryResult, Deferred, Deferred](config, QueryResult(), "FutureCache", useMetric = true)
+ private val futureCache = new DeferCache[StepInnerResult, Deferred, Deferred](config, StepInnerResult.Empty, "FutureCache", useMetric = true)
/** Simple Vertex Cache */
private val vertexCache = new DeferCache[Seq[SKeyValue], Promise, Future](config, Seq.empty[SKeyValue])
+ private val zkQuorum = config.getString("hbase.zookeeper.quorum")
+ private val zkQuorumSlave =
+ if (config.hasPath("hbase.zookeeper.quorum")) Option(config.getString("hbase.zookeeper.quorum"))
+ else None
/**
* fire rpcs into proper hbase cluster using client and
@@ -241,7 +244,7 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: $queryParam")
scanner.setMaxVersions(1)
- scanner.setMaxNumRows(queryParam.limit)
+ scanner.setMaxNumRows(queryParam.offset + queryParam.limit)
scanner.setMaxTimestamp(maxTs)
scanner.setMinTimestamp(minTs)
scanner.setRpcTimeout(queryParam.rpcTimeoutInMillis)
@@ -280,21 +283,38 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
override def fetch(queryRequest: QueryRequest,
prevStepScore: Double,
isInnerCall: Boolean,
- parentEdges: Seq[EdgeWithScore]): Deferred[QueryRequestWithResult] = {
+ parentEdges: Seq[EdgeWithScore]): Deferred[StepInnerResult] = {
+
+ def fetchInner(hbaseRpc: AnyRef): Deferred[StepInnerResult] = {
+ val queryParam = queryRequest.queryParam
- def fetchInner(hbaseRpc: AnyRef): Deferred[QueryResult] = {
fetchKeyValuesInner(hbaseRpc).withCallback { kvs =>
- val edgeWithScores = toEdges(kvs, queryRequest.queryParam, prevStepScore, isInnerCall, parentEdges)
- val resultEdgesWithScores = if (queryRequest.queryParam.sample >= 0) {
- sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
- } else edgeWithScores
- QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty[Byte]))
-// QueryRequestWithResult(queryRequest, QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty)))
+ val (startOffset, length) = queryParam.label.schemaVersion match {
+ case HBaseType.VERSION4 => (queryParam.offset, queryParam.limit)
+ case _ => (0, kvs.length)
+ }
+
+ val edgeWithScores = toEdges(kvs, queryParam, prevStepScore, isInnerCall, parentEdges, startOffset, length)
+ if (edgeWithScores.isEmpty) StepInnerResult.Empty
+ else {
+ val head = edgeWithScores.head
+ val (degreeEdges, indexEdges) =
+ if (head.edge.isDegree) (Seq(head), edgeWithScores.tail)
+ else (Nil, edgeWithScores)
+ val normalized =
+ if (queryRequest.queryParam.shouldNormalize) normalize(indexEdges)
+ else indexEdges
+
+ val sampled = if (queryRequest.queryParam.sample >= 0) {
+ sample(queryRequest, normalized, queryRequest.queryParam.sample)
+ } else normalized
+
+ StepInnerResult(edgesWithScoreLs = sampled, degreeEdges)
+ }
} recoverWith { ex =>
logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex)
- QueryResult(isFailure = true)
-// QueryRequestWithResult(queryRequest, QueryResult(isFailure = true))
+ StepInnerResult.Failure
}
}
@@ -302,27 +322,25 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
val cacheTTL = queryParam.cacheTTLInMillis
val request = buildRequest(queryRequest)
- val defer =
- if (cacheTTL <= 0) fetchInner(request)
- else {
- val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request))
- val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
- futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
+ if (cacheTTL <= 0) fetchInner(request)
+ else {
+ val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request))
+ val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
+ futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
}
- defer withCallback { queryResult => QueryRequestWithResult(queryRequest, queryResult)}
}
override def fetches(queryRequestWithScoreLs: scala.Seq[(QueryRequest, Double)],
- prevStepEdges: Predef.Map[VertexId, scala.Seq[EdgeWithScore]]): Future[scala.Seq[QueryRequestWithResult]] = {
- val defers: Seq[Deferred[QueryRequestWithResult]] = for {
+ prevStepEdges: Predef.Map[VertexId, scala.Seq[EdgeWithScore]]): Future[scala.Seq[StepInnerResult]] = {
+ val defers: Seq[Deferred[StepInnerResult]] = for {
(queryRequest, prevStepScore) <- queryRequestWithScoreLs
parentEdges <- prevStepEdges.get(queryRequest.vertex.id)
} yield fetch(queryRequest, prevStepScore, isInnerCall = false, parentEdges)
- val grouped: Deferred[util.ArrayList[QueryRequestWithResult]] = Deferred.group(defers)
+ val grouped: Deferred[util.ArrayList[StepInnerResult]] = Deferred.group(defers)
grouped withCallback {
- queryResults: util.ArrayList[QueryRequestWithResult] =>
+ queryResults: util.ArrayList[StepInnerResult] =>
queryResults.toIndexedSeq
} toFuture
}
@@ -371,47 +389,56 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
}
- override def createTable(zkAddr: String,
+ override def createTable(_zkAddr: String,
tableName: String,
cfs: List[String],
regionMultiplier: Int,
ttl: Option[Int],
compressionAlgorithm: String): Unit = {
- logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm")
- val admin = getAdmin(zkAddr)
- val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier
- try {
- if (!admin.tableExists(TableName.valueOf(tableName))) {
- try {
- val desc = new HTableDescriptor(TableName.valueOf(tableName))
- desc.setDurability(Durability.ASYNC_WAL)
- for (cf <- cfs) {
- val columnDesc = new HColumnDescriptor(cf)
- .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
- .setBloomFilterType(BloomType.ROW)
- .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
- .setMaxVersions(1)
- .setTimeToLive(2147483647)
- .setMinVersions(0)
- .setBlocksize(32768)
- .setBlockCacheEnabled(true)
- if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
- desc.addFamily(columnDesc)
- }
+ /** TODO: Decide if we will allow each app server to connect to multiple hbase cluster */
+ for {
+ zkAddr <- Seq(zkQuorum) ++ zkQuorumSlave.toSeq
+ } {
+ logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm")
+ val admin = getAdmin(zkAddr)
+ val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier
+ try {
+ if (!admin.tableExists(TableName.valueOf(tableName))) {
+ try {
+ val desc = new HTableDescriptor(TableName.valueOf(tableName))
+ desc.setDurability(Durability.ASYNC_WAL)
+ for (cf <- cfs) {
+ val columnDesc = new HColumnDescriptor(cf)
+ .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
+ .setBloomFilterType(BloomType.ROW)
+ .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
+ .setMaxVersions(1)
+ .setTimeToLive(2147483647)
+ .setMinVersions(0)
+ .setBlocksize(32768)
+ .setBlockCacheEnabled(true)
+ if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
+ desc.addFamily(columnDesc)
+ }
- if (regionCount <= 1) admin.createTable(desc)
- else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount)
- } catch {
- case e: Throwable =>
- logger.error(s"$zkAddr, $tableName failed with $e", e)
- throw e
+ if (regionCount <= 1) admin.createTable(desc)
+ else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount)
+ } catch {
+ case e: Throwable =>
+ logger.error(s"$zkAddr, $tableName failed with $e", e)
+ throw e
+ }
+ } else {
+ logger.info(s"$zkAddr, $tableName, $cfs already exist.")
}
- } else {
- logger.info(s"$zkAddr, $tableName, $cfs already exist.")
+ } catch {
+ case e: Throwable =>
+ logger.error(s"$zkAddr, $tableName failed with $e", e)
+ throw e
+ } finally {
+ admin.close()
+ admin.getConnection.close()
}
- } finally {
- admin.close()
- admin.getConnection.close()
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
index 83d4338..c700e53 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
@@ -63,5 +63,5 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge
else if (indexEdge.op == GraphUtil.operations("incrementCount"))
Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong)
else propsToKeyValues(indexEdge.metas.toSeq)
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
index 4149540..b402c0f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
@@ -51,6 +51,8 @@ object logger {
private val logger = LoggerFactory.getLogger("application")
private val errorLogger = LoggerFactory.getLogger("error")
private val metricLogger = LoggerFactory.getLogger("metrics")
+ private val queryLogger = LoggerFactory.getLogger("query")
+ private val malformedLogger = LoggerFactory.getLogger("malformed")
def metric[T: Loggable](msg: => T) = metricLogger.info(implicitly[Loggable[T]].toLogMessage(msg))
@@ -61,6 +63,10 @@ object logger {
def error[T: Loggable](msg: => T, exception: => Throwable) = errorLogger.error(implicitly[Loggable[T]].toLogMessage(msg), exception)
def error[T: Loggable](msg: => T) = errorLogger.error(implicitly[Loggable[T]].toLogMessage(msg))
+
+ def query[T: Loggable](msg: => T) = queryLogger.info(implicitly[Loggable[T]].toLogMessage(msg))
+
+ def malformed[T: Loggable](msg: => T, exception: => Throwable) = malformedLogger.error(implicitly[Loggable[T]].toLogMessage(msg), exception)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
index a018c01..6933320 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
@@ -19,10 +19,12 @@
package org.apache.s2graph.core
+import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core.mysqls.LabelMeta
import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs, VertexId}
import org.apache.s2graph.core.utils.logger
import org.scalatest.FunSuite
+import play.api.libs.json.{JsObject, Json}
class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
initTests()
@@ -39,7 +41,8 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
val (srcId, tgtId, labelName) = ("1", "2", testLabelName)
val bulkEdge = (for ((ts, op, props) <- bulkQueries) yield {
- Management.toEdge(ts.toLong, op, srcId, tgtId, labelName, "out", props).toLogString
+ val properties = fromJsonToProperties(Json.parse(props).as[JsObject])
+ Edge.toEdge(srcId, tgtId, labelName, "out", properties, ts.toLong, op).toLogString
}).mkString("\n")
val expected = Seq(