You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/11/16 16:01:22 UTC
[1/3] incubator-s2graph git commit: [S2GRAPH-121]: Create `Result`
class to hold traverse result edges.
Repository: incubator-s2graph
Updated Branches:
refs/heads/master b5908311a -> 8dbb9a3ee
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala
index fda9991..6054d67 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala
@@ -20,27 +20,27 @@
package org.apache.s2graph.core.Integrate
import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.utils.logger
import play.api.libs.json.{JsObject, Json}
class CrudTest extends IntegrateCommon {
import CrudHelper._
import TestUtil._
- test("test CRUD") {
- var tcNum = 0
- var tcString = ""
- var bulkQueries = List.empty[(Long, String, String)]
- var expected = Map.empty[String, String]
-
- val curTime = System.currentTimeMillis
- val t1 = curTime + 0
- val t2 = curTime + 1
- val t3 = curTime + 2
- val t4 = curTime + 3
- val t5 = curTime + 4
-
- val tcRunner = new CrudTestRunner()
- tcNum = 1
+ var tcString = ""
+ var bulkQueries = List.empty[(Long, String, String)]
+ var expected = Map.empty[String, String]
+
+ val curTime = System.currentTimeMillis
+ val t1 = curTime + 0
+ val t2 = curTime + 1
+ val t3 = curTime + 2
+ val t4 = curTime + 3
+ val t5 = curTime + 4
+
+ val tcRunner = new CrudTestRunner()
+ test("1: [t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test") {
+ val tcNum = 1
tcString = "[t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test "
bulkQueries = List(
@@ -50,8 +50,9 @@ class CrudTest extends IntegrateCommon {
expected = Map("time" -> "10", "weight" -> "20")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
-
- tcNum = 2
+ }
+ test("2: [t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test") {
+ val tcNum = 2
tcString = "[t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test "
bulkQueries = List(
(t1, "insert", "{\"time\": 10}"),
@@ -60,8 +61,9 @@ class CrudTest extends IntegrateCommon {
expected = Map("time" -> "10", "weight" -> "20")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
-
- tcNum = 3
+ }
+ test("3: [t3 -> t2 -> t1 test case] insert(t3) delete(t2) insert(t1) test") {
+ val tcNum = 3
tcString = "[t3 -> t2 -> t1 test case] insert(t3) delete(t2) insert(t1) test "
bulkQueries = List(
(t3, "insert", "{\"time\": 10, \"weight\": 20}"),
@@ -70,8 +72,9 @@ class CrudTest extends IntegrateCommon {
expected = Map("time" -> "10", "weight" -> "20")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
-
- tcNum = 4
+ }
+ test("4: [t3 -> t1 -> t2 test case] insert(t3) insert(t1) delete(t2) test") {
+ val tcNum = 4
tcString = "[t3 -> t1 -> t2 test case] insert(t3) insert(t1) delete(t2) test "
bulkQueries = List(
(t3, "insert", "{\"time\": 10, \"weight\": 20}"),
@@ -80,8 +83,9 @@ class CrudTest extends IntegrateCommon {
expected = Map("time" -> "10", "weight" -> "20")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
-
- tcNum = 5
+ }
+ test("5: [t2 -> t1 -> t3 test case] delete(t2) insert(t1) insert(t3) test") {
+ val tcNum = 5
tcString = "[t2 -> t1 -> t3 test case] delete(t2) insert(t1) insert(t3) test"
bulkQueries = List(
(t2, "delete", ""),
@@ -90,8 +94,9 @@ class CrudTest extends IntegrateCommon {
expected = Map("time" -> "10", "weight" -> "20")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
-
- tcNum = 6
+ }
+ test("6: [t2 -> t3 -> t1 test case] delete(t2) insert(t3) insert(t1) test") {
+ val tcNum = 6
tcString = "[t2 -> t3 -> t1 test case] delete(t2) insert(t3) insert(t1) test "
bulkQueries = List(
(t2, "delete", ""),
@@ -100,8 +105,9 @@ class CrudTest extends IntegrateCommon {
expected = Map("time" -> "10", "weight" -> "20")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
-
- tcNum = 7
+ }
+ test("7: [t1 -> t2 -> t3 test case] update(t1) delete(t2) update(t3) test ") {
+ val tcNum = 7
tcString = "[t1 -> t2 -> t3 test case] update(t1) delete(t2) update(t3) test "
bulkQueries = List(
(t1, "update", "{\"time\": 10}"),
@@ -110,7 +116,9 @@ class CrudTest extends IntegrateCommon {
expected = Map("time" -> "10", "weight" -> "20")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
- tcNum = 8
+ }
+ test("8: [t1 -> t3 -> t2 test case] update(t1) update(t3) delete(t2) test ") {
+ val tcNum = 8
tcString = "[t1 -> t3 -> t2 test case] update(t1) update(t3) delete(t2) test "
bulkQueries = List(
(t1, "update", "{\"time\": 10}"),
@@ -119,7 +127,9 @@ class CrudTest extends IntegrateCommon {
expected = Map("time" -> "10", "weight" -> "20")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
- tcNum = 9
+ }
+ test("9: [t2 -> t1 -> t3 test case] delete(t2) update(t1) update(t3) test") {
+ val tcNum = 9
tcString = "[t2 -> t1 -> t3 test case] delete(t2) update(t1) update(t3) test "
bulkQueries = List(
(t2, "delete", ""),
@@ -128,7 +138,9 @@ class CrudTest extends IntegrateCommon {
expected = Map("time" -> "10", "weight" -> "20")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
- tcNum = 10
+ }
+ test("10: [t2 -> t3 -> t1 test case] delete(t2) update(t3) update(t1) test") {
+ val tcNum = 10
tcString = "[t2 -> t3 -> t1 test case] delete(t2) update(t3) update(t1) test"
bulkQueries = List(
(t2, "delete", ""),
@@ -137,7 +149,9 @@ class CrudTest extends IntegrateCommon {
expected = Map("time" -> "10", "weight" -> "20")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
- tcNum = 11
+ }
+ test("11: [t3 -> t2 -> t1 test case] update(t3) delete(t2) update(t1) test") {
+ val tcNum = 11
tcString = "[t3 -> t2 -> t1 test case] update(t3) delete(t2) update(t1) test "
bulkQueries = List(
(t3, "update", "{\"time\": 10, \"weight\": 20}"),
@@ -146,7 +160,9 @@ class CrudTest extends IntegrateCommon {
expected = Map("time" -> "10", "weight" -> "20")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
- tcNum = 12
+ }
+ test("12: [t3 -> t1 -> t2 test case] update(t3) update(t1) delete(t2) test") {
+ val tcNum = 12
tcString = "[t3 -> t1 -> t2 test case] update(t3) update(t1) delete(t2) test "
bulkQueries = List(
(t3, "update", "{\"time\": 10, \"weight\": 20}"),
@@ -155,8 +171,9 @@ class CrudTest extends IntegrateCommon {
expected = Map("time" -> "10", "weight" -> "20")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
-
- tcNum = 13
+ }
+ test("13: [t5 -> t1 -> t3 -> t2 -> t4 test case] update(t5) insert(t1) insert(t3) delete(t2) update(t4) test ") {
+ val tcNum = 13
tcString = "[t5 -> t1 -> t3 -> t2 -> t4 test case] update(t5) insert(t1) insert(t3) delete(t2) update(t4) test "
bulkQueries = List(
(t5, "update", "{\"is_blocked\": true}"),
@@ -169,6 +186,15 @@ class CrudTest extends IntegrateCommon {
tcRunner.run(tcNum, tcString, bulkQueries, expected)
}
+ test("14 - test lock expire") {
+ for {
+ labelName <- List(testLabelName, testLabelName2)
+ } {
+ val id = 0
+ tcRunner.expireTC(labelName, id)
+ }
+ }
+
object CrudHelper {
@@ -191,7 +217,7 @@ class CrudTest extends IntegrateCommon {
val bulkEdges = (for ((ts, op, props) <- opWithProps) yield {
TestUtil.toEdge(ts, op, "e", srcId, tgtId, labelName, props)
})
-
+ println(s"${bulkEdges.mkString("\n")}")
TestUtil.insertEdgesSync(bulkEdges: _*)
for {
@@ -210,6 +236,7 @@ class CrudTest extends IntegrateCommon {
val jsResult = TestUtil.getEdgesSync(query)
val results = jsResult \ "results"
+
val deegrees = (jsResult \ "degrees").as[List[JsObject]]
val propsLs = (results \\ "props").seq
(deegrees.head \ LabelMeta.degree.name).as[Int] should be(1)
@@ -229,6 +256,63 @@ class CrudTest extends IntegrateCommon {
}
}
+ def expireTC(labelName: String, id: Int) = {
+ var i = 1
+ val label = Label.findByName(labelName).get
+ val serviceName = label.serviceName
+ val columnName = label.srcColumnName
+ val id = 0
+
+ while (i < 1000) {
+ val bulkEdges = Seq(TestUtil.toEdge(i, "u", "e", id, id, testLabelName, Json.obj("time" -> 10).toString()))
+ val rets = TestUtil.insertEdgesSync(bulkEdges: _*)
+
+
+ val queryJson = querySnapshotEdgeJson(serviceName, columnName, labelName, id)
+
+ if (!rets.forall(identity)) {
+ Thread.sleep(graph.storage.LockExpireDuration + 100)
+ /** expect current request would be ignored */
+ val bulkEdges = Seq(TestUtil.toEdge(i-1, "u", "e", 0, 0, testLabelName, Json.obj("time" -> 20).toString()))
+ val rets = TestUtil.insertEdgesSync(bulkEdges: _*)
+ if (rets.forall(identity)) {
+ // check
+ val jsResult = TestUtil.getEdgesSync(queryJson)
+ (jsResult \\ "time").head.as[Int] should be(10)
+ logger.debug(jsResult)
+ i = 100000
+ }
+ }
+
+ i += 1
+ }
+
+ i = 1
+ while (i < 1000) {
+ val bulkEdges = Seq(TestUtil.toEdge(i, "u", "e", id, id, testLabelName, Json.obj("time" -> 10).toString()))
+ val rets = TestUtil.insertEdgesSync(bulkEdges: _*)
+
+
+ val queryJson = querySnapshotEdgeJson(serviceName, columnName, labelName, id)
+
+ if (!rets.forall(identity)) {
+ Thread.sleep(graph.storage.LockExpireDuration + 100)
+ /** expect current request would be applied */
+ val bulkEdges = Seq(TestUtil.toEdge(i+1, "u", "e", 0, 0, testLabelName, Json.obj("time" -> 20).toString()))
+ val rets = TestUtil.insertEdgesSync(bulkEdges: _*)
+ if (rets.forall(identity)) {
+ // check
+ val jsResult = TestUtil.getEdgesSync(queryJson)
+ (jsResult \\ "time").head.as[Int] should be(20)
+ logger.debug(jsResult)
+ i = 100000
+ }
+ }
+
+ i += 1
+ }
+ }
+
def queryJson(serviceName: String, columnName: String, labelName: String, id: String, dir: String, cacheTTL: Long = -1L) = Json.parse(
s""" { "srcVertices": [
{ "serviceName": "$serviceName",
@@ -240,6 +324,15 @@ class CrudTest extends IntegrateCommon {
"offset": 0,
"limit": 10,
"cacheTTL": $cacheTTL }]]}""")
+
+ def querySnapshotEdgeJson(serviceName: String, columnName: String, labelName: String, id: Int) = Json.parse(
+ s""" { "srcVertices": [
+ { "serviceName": "$serviceName",
+ "columnName": "$columnName",
+ "id": $id } ],
+ "steps": [ [ {
+ "label": "$labelName",
+ "_to": $id }]]}""")
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
index 225d396..b341ec5 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
@@ -43,7 +43,7 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll {
config = ConfigFactory.load()
graph = new Graph(config)(ExecutionContext.Implicits.global)
management = new Management(graph)
- parser = new RequestParser(graph.config)
+ parser = new RequestParser(graph)
initTestData()
}
@@ -120,7 +120,8 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll {
def deleteAllSync(jsValue: JsValue) = {
val future = Future.sequence(jsValue.as[Seq[JsValue]] map { json =>
val (labels, direction, ids, ts, vertices) = parser.toDeleteParam(json)
- val future = graph.deleteAllAdjacentEdges(vertices.toList, labels, GraphUtil.directions(direction), ts)
+ val srcVertices = vertices
+ val future = graph.deleteAllAdjacentEdges(srcVertices.toList, labels, GraphUtil.directions(direction), ts)
future
})
@@ -131,10 +132,13 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll {
def getEdgesSync(queryJson: JsValue): JsValue = {
logger.info(Json.prettyPrint(queryJson))
val restHandler = new RestHandler(graph)
- Await.result(restHandler.getEdgesAsync(queryJson)(PostProcess.toSimpleVertexArrJson), HttpRequestWaitingTime)
+ val result = Await.result(restHandler.getEdgesAsync(queryJson)(PostProcess.toJson), HttpRequestWaitingTime)
+ logger.debug(s"${Json.prettyPrint(result)}")
+ result
}
def insertEdgesSync(bulkEdges: String*) = {
+ logger.debug(s"${bulkEdges.mkString("\n")}")
val req = graph.mutateElements(parser.toGraphElements(bulkEdges.mkString("\n")), withWait = true)
val jsResult = Await.result(req, HttpRequestWaitingTime)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
index 9c52b32..54bb12c 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
@@ -307,7 +307,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
"label": "$testLabelName",
"direction": "in",
"offset": 0,
- "limit": 10
+ "limit": 1000
}
]]
}""".stripMargin)
@@ -328,54 +328,54 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
-// test("pagination and _to") {
-// def querySingleWithTo(id: Int, offset: Int = 0, limit: Int = 100, to: Int) = Json.parse(
-// s"""
-// { "srcVertices": [
-// { "serviceName": "${testServiceName}",
-// "columnName": "${testColumnName}",
-// "id": ${id}
-// }],
-// "steps": [
-// [ {
-// "label": "${testLabelName}",
-// "direction": "out",
-// "offset": $offset,
-// "limit": $limit,
-// "_to": $to
-// }
-// ]]
-// }
-// """)
-//
-// val src = System.currentTimeMillis().toInt
-//
-// val bulkEdges = Seq(
-// toEdge(1001, insert, e, src, 1, testLabelName, Json.obj(weight -> 10, is_hidden -> true)),
-// toEdge(2002, insert, e, src, 2, testLabelName, Json.obj(weight -> 20, is_hidden -> false)),
-// toEdge(3003, insert, e, src, 3, testLabelName, Json.obj(weight -> 30)),
-// toEdge(4004, insert, e, src, 4, testLabelName, Json.obj(weight -> 40))
-// )
-// insertEdgesSync(bulkEdges: _*)
-//
-// var result = getEdgesSync(querySingle(src, offset = 0, limit = 2))
-// var edges = (result \ "results").as[List[JsValue]]
-//
-// edges.size should be(2)
-// (edges(0) \ "to").as[Long] should be(4)
-// (edges(1) \ "to").as[Long] should be(3)
-//
-// result = getEdgesSync(querySingle(src, offset = 1, limit = 2))
-//
-// edges = (result \ "results").as[List[JsValue]]
-// edges.size should be(2)
-// (edges(0) \ "to").as[Long] should be(3)
-// (edges(1) \ "to").as[Long] should be(2)
-//
-// result = getEdgesSync(querySingleWithTo(src, offset = 0, limit = -1, to = 1))
-// edges = (result \ "results").as[List[JsValue]]
-// edges.size should be(1)
-// }
+ test("pagination and _to") {
+ def querySingleWithTo(id: Int, offset: Int = 0, limit: Int = 100, to: Int) = Json.parse(
+ s"""
+ { "srcVertices": [
+ { "serviceName": "${testServiceName}",
+ "columnName": "${testColumnName}",
+ "id": ${id}
+ }],
+ "steps": [
+ [ {
+ "label": "${testLabelName}",
+ "direction": "out",
+ "offset": $offset,
+ "limit": $limit,
+ "_to": $to
+ }
+ ]]
+ }
+ """)
+
+ val src = System.currentTimeMillis().toInt
+
+ val bulkEdges = Seq(
+ toEdge(1001, insert, e, src, 1, testLabelName, Json.obj(weight -> 10, is_hidden -> true)),
+ toEdge(2002, insert, e, src, 2, testLabelName, Json.obj(weight -> 20, is_hidden -> false)),
+ toEdge(3003, insert, e, src, 3, testLabelName, Json.obj(weight -> 30)),
+ toEdge(4004, insert, e, src, 4, testLabelName, Json.obj(weight -> 40))
+ )
+ insertEdgesSync(bulkEdges: _*)
+
+ var result = getEdgesSync(querySingle(src, offset = 0, limit = 2))
+ var edges = (result \ "results").as[List[JsValue]]
+
+ edges.size should be(2)
+ (edges(0) \ "to").as[Long] should be(4)
+ (edges(1) \ "to").as[Long] should be(3)
+
+ result = getEdgesSync(querySingle(src, offset = 1, limit = 2))
+
+ edges = (result \ "results").as[List[JsValue]]
+ edges.size should be(2)
+ (edges(0) \ "to").as[Long] should be(3)
+ (edges(1) \ "to").as[Long] should be(2)
+
+ result = getEdgesSync(querySingleWithTo(src, offset = 0, limit = -1, to = 1))
+ edges = (result \ "results").as[List[JsValue]]
+ edges.size should be(1)
+ }
test("order by") {
def queryScore(id: Int, scoring: Map[String, Int]): JsValue = Json.obj(
@@ -907,7 +907,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
""".stripMargin
)
- def querySingleVertexWithOp(id: String, op: String, shrinkageVal: Long) = Json.parse(
+ def queryWithOp(ids: Seq[String], op: String, shrinkageVal: Long) = Json.parse(
s"""{
| "limit": 10,
| "groupBy": ["from"],
@@ -916,7 +916,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
| {
| "serviceName": "$testServiceName",
| "columnName": "$testColumnName",
- | "id": $id
+ | "ids": [${ids.mkString(",")}]
| }
| ],
| "steps": [
@@ -949,47 +949,6 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
""".stripMargin
)
- def queryMultiVerticesWithOp(id: String, id2: String, op: String, shrinkageVal: Long) = Json.parse(
- s"""{
- | "limit": 10,
- | "groupBy": ["from"],
- | "duplicate": "sum",
- | "srcVertices": [
- | {
- | "serviceName": "$testServiceName",
- | "columnName": "$testColumnName",
- | "ids": [$id, $id2]
- | }
- | ],
- | "steps": [
- | {
- | "step": [
- | {
- | "label": "$testLabelName",
- | "direction": "out",
- | "offset": 0,
- | "limit": 10,
- | "groupBy": ["from"],
- | "duplicate": "countSum",
- | "transform": [["_from"]]
- | }
- | ]
- | }, {
- | "step": [
- | {
- | "label": "$testLabelName2",
- | "direction": "out",
- | "offset": 0,
- | "limit": 10,
- | "scorePropagateOp": "$op",
- | "scorePropagateShrinkage": $shrinkageVal
- | }
- | ]
- | }
- | ]
- |}
- """.stripMargin
- )
val testId = "-30000"
val testId2 = "-4000"
@@ -1014,15 +973,15 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
val secondStepEdgeCount = 4l
var shrinkageVal = 10l
- var rs = getEdgesSync(querySingleVertexWithOp(testId, "divide", shrinkageVal))
- logger.debug(Json.prettyPrint(rs))
+ var rs = getEdgesSync(queryWithOp(Seq(testId), "divide", shrinkageVal))
+
var results = (rs \ "results").as[List[JsValue]]
results.size should be(1)
var scoreSum = secondStepEdgeCount.toDouble / (firstStepEdgeCount.toDouble + shrinkageVal)
(results(0) \ "scoreSum").as[Double] should be(scoreSum)
- rs = getEdgesSync(queryMultiVerticesWithOp(testId, testId2, "divide", shrinkageVal))
- logger.debug(Json.prettyPrint(rs))
+ rs = getEdgesSync(queryWithOp(Seq(testId, testId2), "divide", shrinkageVal))
+
results = (rs \ "results").as[List[JsValue]]
results.size should be(2)
scoreSum = secondStepEdgeCount.toDouble / (firstStepEdgeCount.toDouble + shrinkageVal)
@@ -1033,21 +992,21 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
// check for divide zero case
shrinkageVal = 30l
rs = getEdgesSync(queryWithPropertyOp(testId, "divide", shrinkageVal))
- logger.debug(Json.prettyPrint(rs))
+
results = (rs \ "results").as[List[JsValue]]
results.size should be(1)
(results(0) \ "scoreSum").as[Double] should be(0)
// "plus" operation
- rs = getEdgesSync(querySingleVertexWithOp(testId, "plus", shrinkageVal))
- logger.debug(Json.prettyPrint(rs))
+ rs = getEdgesSync(queryWithOp(Seq(testId), "plus", shrinkageVal))
+
results = (rs \ "results").as[List[JsValue]]
results.size should be(1)
scoreSum = (firstStepEdgeCount + 1) * secondStepEdgeCount
(results(0) \ "scoreSum").as[Long] should be(scoreSum)
// "multiply" operation
- rs = getEdgesSync(querySingleVertexWithOp(testId, "multiply", shrinkageVal))
+ rs = getEdgesSync(queryWithOp(Seq(testId), "multiply", shrinkageVal))
logger.debug(Json.prettyPrint(rs))
results = (rs \ "results").as[List[JsValue]]
results.size should be(1)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala
index 99b56f7..6092fe4 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala
@@ -141,11 +141,12 @@ class StrongLabelDeleteTest extends IntegrateCommon {
test("large degrees") {
val labelName = testLabelName2
val dir = "out"
+ val minSize = 0
val maxSize = 100
val deleteSize = 10
val numOfConcurrentBatch = 100
- val src = System.currentTimeMillis()
- val tgts = (0 until maxSize).map { ith => src + ith }
+ val src = 1092983
+ val tgts = (minSize until maxSize).map { ith => src + ith }
val deleteTgts = Random.shuffle(tgts).take(deleteSize)
val insertRequests = tgts.map { tgt =>
Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t")
@@ -181,11 +182,12 @@ class StrongLabelDeleteTest extends IntegrateCommon {
test("deleteAll") {
val labelName = testLabelName2
val dir = "out"
- val maxSize = 100
+ val minSize = 200
+ val maxSize = 300
val deleteSize = 10
val numOfConcurrentBatch = 100
- val src = System.currentTimeMillis()
- val tgts = (0 until maxSize).map { ith => src + ith }
+ val src = 192338237
+ val tgts = (minSize until maxSize).map { ith => src + ith }
val deleteTgts = Random.shuffle(tgts).take(deleteSize)
val insertRequests = tgts.map { tgt =>
Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t")
@@ -220,7 +222,7 @@ class StrongLabelDeleteTest extends IntegrateCommon {
// val labelName = testLabelName
val maxTgtId = 10
val batchSize = 10
- val testNum = 100
+ val testNum = 10
val numOfBatch = 10
def testInner(startTs: Long, src: Long) = {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala
index a1bff68..603ca12 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala
@@ -31,6 +31,8 @@ class VertexTestHelper extends IntegrateCommon {
import TestUtil._
import VertexTestHelper._
+
+
test("vertex") {
val ids = (7 until 20).map(tcNum => tcNum * 1000 + 0)
val (serviceName, columnName) = (testServiceName, testColumnName)
@@ -40,9 +42,10 @@ class VertexTestHelper extends IntegrateCommon {
println(payload)
val vertices = parser.toVertices(payload, "insert", Option(serviceName), Option(columnName))
- Await.result(graph.mutateVertices(vertices, withWait = true), HttpRequestWaitingTime)
+ val srcVertices = vertices
+ Await.result(graph.mutateVertices(srcVertices, withWait = true), HttpRequestWaitingTime)
- val res = graph.getVertices(vertices).map { vertices =>
+ val res = graph.getVertices(srcVertices).map { vertices =>
PostProcess.verticesToJson(vertices)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
index 3f76d59..d62dee8 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -33,11 +33,12 @@ class WeakLabelDeleteTest extends IntegrateCommon with BeforeAndAfterEach {
import WeakLabelDeleteHelper._
test("test weak consistency select") {
+ insertEdgesSync(bulkEdges(): _*)
var result = getEdgesSync(query(0))
- println(result)
+
(result \ "results").as[List[JsValue]].size should be(4)
result = getEdgesSync(query(10))
- println(result)
+
(result \ "results").as[List[JsValue]].size should be(2)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala
index 419e9c4..bab6e03 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala
@@ -19,10 +19,11 @@
package org.apache.s2graph.core
-import org.apache.s2graph.core.types.{InnerVal, InnerValLike}
import org.apache.s2graph.core.JSONParser._
+import org.apache.s2graph.core.types.{InnerVal, InnerValLike}
import org.scalatest.{FunSuite, Matchers}
+
class JsonParserTest extends FunSuite with Matchers with TestCommon {
import InnerVal._
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala
index 06af38c..61d1096 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.core
import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.types.LabelWithDirection
+import org.apache.s2graph.core.types.{InnerVal, InnerValLike, HBaseSerializable, LabelWithDirection}
import org.scalatest.{FunSuite, Matchers}
class QueryParamTest extends FunSuite with Matchers with TestCommon {
@@ -101,5 +101,59 @@ class QueryParamTest extends FunSuite with Matchers with TestCommon {
println(s">> diff: $duration")
}
+ test("QueryParam interval min/max bytes padding test") {
+ import HBaseSerializable._
+ val queryParam = QueryParam.Empty
+ def compare(_from: Seq[InnerValLike], _to: Seq[InnerValLike], _value: Seq[InnerValLike]): Boolean = {
+ val len = _from.length.toByte
+ val from = _from.zipWithIndex map { case (innerVal: InnerValLike, idx: Int) => idx.toByte -> innerVal }
+ val to = _to.zipWithIndex map { case (innerVal: InnerValLike, idx: Int) => idx.toByte -> innerVal }
+ val value = _value.zipWithIndex map { case (innerVal: InnerValLike, idx: Int) => idx.toByte -> innerVal }
+
+ val (fromBytes, toBytes) = queryParam.paddingInterval(len, from, to)
+ val valueBytes = propsToBytes(value)
+
+ val validFrom = Bytes.compareTo(fromBytes, valueBytes) <= 0
+ val validTo = Bytes.compareTo(toBytes, valueBytes) >= 0
+
+ val res = validFrom && validTo
+ // if (!res) logger.error(s"from: $validFrom, to: $validTo, from: ${_from} to: ${_to} value: ${_value}")
+ res
+ }
+
+ val v = "v3"
+ compare(
+ Seq(InnerVal.withLong(0L, v)),
+ Seq(InnerVal.withLong(0L, v)),
+ Seq(InnerVal.withLong(0L, v))) shouldBe true
+
+ compare(
+ Seq(InnerVal.withLong(0L, v)),
+ Seq(InnerVal.withLong(0L, v)),
+ Seq(InnerVal.withLong(1L, v))) shouldBe false
+
+ compare(
+ Seq(InnerVal.withLong(1L, v)),
+ Seq(InnerVal.withLong(1L, v)),
+ Seq(InnerVal.withLong(0L, v))) shouldBe false
+
+ compare(
+ Seq(InnerVal.withLong(0L, v)),
+ Seq(InnerVal.withLong(1L, v)),
+ Seq(InnerVal.withLong(2L, v))) shouldBe false
+
+ val testNum = 100000
+ val tests = for {
+ n <- 0 to testNum
+ min = scala.util.Random.nextInt(Int.MaxValue / 2) + 1
+ max = min + scala.util.Random.nextInt(min)
+ value = min + scala.util.Random.nextInt(max - min + 1)
+ } yield compare(
+ Seq(InnerVal.withLong(min, v)),
+ Seq(InnerVal.withLong(max, v)),
+ Seq(InnerVal.withLong(value, v)))
+
+ tests.forall(identity) shouldBe true
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
index 33a901d..584a641 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
@@ -146,7 +146,7 @@ trait TestCommonWithModels {
isDirected = true, serviceNameV4, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION4, false, "lg4", None)
management.createLabel(undirectedLabelName, serviceName, columnName, columnType, serviceName, tgtColumnName, tgtColumnType,
- isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4", None)
+ isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION3, false, "lg4", None)
management.createLabel(undirectedLabelNameV2, serviceNameV2, columnNameV2, columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2,
isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4", None)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/benchmark/JsonBenchmarkSpec.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/JsonBenchmarkSpec.scala b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/JsonBenchmarkSpec.scala
index 8ba9ea2..05dcd30 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/JsonBenchmarkSpec.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/JsonBenchmarkSpec.scala
@@ -23,41 +23,55 @@ import play.api.libs.json.JsNumber
import play.libs.Json
class JsonBenchmarkSpec extends BenchmarkCommon {
- "to json" >> {
- "json benchmark" >> {
- duration("map to json") {
- (0 to 10) foreach { n =>
- val numberMaps = (0 to 100).map { n => (n.toString -> JsNumber(n * n)) }.toMap
- Json.toJson(numberMaps)
- }
+ "JsonBenchSpec" should {
+
+ "Json Append" >> {
+ import play.api.libs.json.{Json, _}
+ val numberJson = Json.toJson((0 to 1000).map { i => s"$i" -> JsNumber(i * i) }.toMap).as[JsObject]
+
+ /** dummy warm-up **/
+ (0 to 10000) foreach { n =>
+ Json.obj(s"$n" -> "dummy") ++ numberJson
+ }
+ (0 to 10000) foreach { n =>
+ Json.obj(s"$n" -> numberJson)
}
- duration("directMakeJson") {
- (0 to 10) foreach { n =>
- var jsObj = play.api.libs.json.Json.obj()
- (0 to 10).foreach { n =>
- jsObj += (n.toString -> JsNumber(n * n))
- }
+ duration("Append by JsObj ++ JsObj ") {
+ (0 to 100000) foreach { n =>
+ numberJson ++ Json.obj(s"$n" -> "dummy")
}
}
- duration("map to json 2") {
- (0 to 50) foreach { n =>
- val numberMaps = (0 to 10).map { n => (n.toString -> JsNumber(n * n)) }.toMap
- Json.toJson(numberMaps)
+ duration("Append by Json.obj(newJson -> JsObj)") {
+ (0 to 100000) foreach { n =>
+ Json.obj(s"$n" -> numberJson)
}
}
+ true
+ }
+ }
- duration("directMakeJson 2") {
- (0 to 50) foreach { n =>
- var jsObj = play.api.libs.json.Json.obj()
- (0 to 10).foreach { n =>
- jsObj += (n.toString -> JsNumber(n * n))
- }
+ "Make Json" >> {
+ duration("map to json") {
+ (0 to 10000) foreach { n =>
+ val numberMaps = (0 to 100).map { n =>
+ n.toString -> JsNumber(n * n)
+ }.toMap
+
+ Json.toJson(numberMaps)
+ }
+ }
+
+ duration("direct") {
+ (0 to 10000) foreach { n =>
+ var jsObj = play.api.libs.json.Json.obj()
+
+ (0 to 100).foreach { n =>
+ jsObj += (n.toString -> JsNumber(n * n))
}
}
- true
}
true
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/benchmark/SamplingBenchmarkSpec.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/SamplingBenchmarkSpec.scala b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/SamplingBenchmarkSpec.scala
index a8777fb..ec19641 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/SamplingBenchmarkSpec.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/SamplingBenchmarkSpec.scala
@@ -1,102 +1,105 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.benchmark
-
-import scala.annotation.tailrec
-import scala.util.Random
-
-class SamplingBenchmarkSpec extends BenchmarkCommon {
- "sample" should {
-
- "sample benchmark" in {
- @tailrec
- def randomInt(n: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = {
- if (set.size == n) set
- else randomInt(n, range, set + Random.nextInt(range))
- }
-
- // sample using random array
- def randomArraySample[T](num: Int, ls: List[T]): List[T] = {
- val randomNum = randomInt(num, ls.size)
- var sample = List.empty[T]
- var idx = 0
- ls.foreach { e =>
- if (randomNum.contains(idx)) sample = e :: sample
- idx += 1
- }
- sample
- }
-
- // sample using shuffle
- def shuffleSample[T](num: Int, ls: List[T]): List[T] = {
- Random.shuffle(ls).take(num)
- }
-
- // sample using random number generation
- def rngSample[T](num: Int, ls: List[T]): List[T] = {
- var sampled = List.empty[T]
- val N = ls.size // population
- var t = 0 // total input records dealt with
- var m = 0 // number of items selected so far
-
- while (m < num) {
- val u = Random.nextDouble()
- if ((N - t) * u < num - m) {
- sampled = ls(t) :: sampled
- m += 1
- }
- t += 1
- }
- sampled
- }
-
- // test data
- val testLimit = 10000
- val testNum = 10
- val testData = (0 to 1000).toList
-
- // dummy for warm-up
- (0 to testLimit) foreach { n =>
- randomArraySample(testNum, testData)
- shuffleSample(testNum, testData)
- rngSample(testNum, testData)
- }
-
- duration("Random Array Sampling") {
- (0 to testLimit) foreach { _ =>
- val sampled = randomArraySample(testNum, testData)
- }
- }
-
- duration("Shuffle Sampling") {
- (0 to testLimit) foreach { _ =>
- val sampled = shuffleSample(testNum, testData)
- }
- }
-
- duration("RNG Sampling") {
- (0 to testLimit) foreach { _ =>
- val sampled = rngSample(testNum, testData)
- }
- }
- true
- }
- }
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements. See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership. The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied. See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// */
+//
+//package org.apache.s2graph.rest.play.benchmark
+//
+//import play.api.test.{FakeApplication, PlaySpecification, WithApplication}
+//
+//import scala.annotation.tailrec
+//import scala.util.Random
+//
+//class SamplingBenchmarkSpec extends BenchmarkCommon with PlaySpecification {
+// "sample" should {
+// implicit val app = FakeApplication()
+//
+// "sample benchmark" in new WithApplication(app) {
+// @tailrec
+// def randomInt(n: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = {
+// if (set.size == n) set
+// else randomInt(n, range, set + Random.nextInt(range))
+// }
+//
+// // sample using random array
+// def randomArraySample[T](num: Int, ls: List[T]): List[T] = {
+// val randomNum = randomInt(num, ls.size)
+// var sample = List.empty[T]
+// var idx = 0
+// ls.foreach { e =>
+// if (randomNum.contains(idx)) sample = e :: sample
+// idx += 1
+// }
+// sample
+// }
+//
+// // sample using shuffle
+// def shuffleSample[T](num: Int, ls: List[T]): List[T] = {
+// Random.shuffle(ls).take(num)
+// }
+//
+// // sample using random number generation
+// def rngSample[T](num: Int, ls: List[T]): List[T] = {
+// var sampled = List.empty[T]
+// val N = ls.size // population
+// var t = 0 // total input records dealt with
+// var m = 0 // number of items selected so far
+//
+// while (m < num) {
+// val u = Random.nextDouble()
+// if ((N - t) * u < num - m) {
+// sampled = ls(t) :: sampled
+// m += 1
+// }
+// t += 1
+// }
+// sampled
+// }
+//
+// // test data
+// val testLimit = 10000
+// val testNum = 10
+// val testData = (0 to 1000).toList
+//
+// // dummy for warm-up
+// (0 to testLimit) foreach { n =>
+// randomArraySample(testNum, testData)
+// shuffleSample(testNum, testData)
+// rngSample(testNum, testData)
+// }
+//
+// duration("Random Array Sampling") {
+// (0 to testLimit) foreach { _ =>
+// val sampled = randomArraySample(testNum, testData)
+// }
+// }
+//
+// duration("Shuffle Sampling") {
+// (0 to testLimit) foreach { _ =>
+// val sampled = shuffleSample(testNum, testData)
+// }
+// }
+//
+// duration("RNG Sampling") {
+// (0 to testLimit) foreach { _ =>
+// val sampled = rngSample(testNum, testData)
+// }
+// }
+// true
+// }
+// }
+//}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
----------------------------------------------------------------------
diff --git a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
index a654a83..c8f65bf 100644
--- a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
+++ b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
@@ -25,7 +25,7 @@ import com.typesafe.config.ConfigFactory
import io.netty.bootstrap.ServerBootstrap
import io.netty.buffer.{ByteBuf, Unpooled}
import io.netty.channel._
-import io.netty.channel.epoll.{EpollServerSocketChannel, EpollEventLoopGroup}
+import io.netty.channel.epoll.{EpollEventLoopGroup, EpollServerSocketChannel}
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
@@ -36,14 +36,14 @@ import io.netty.util.CharsetUtil
import org.apache.s2graph.core.GraphExceptions.BadQueryException
import org.apache.s2graph.core.mysqls.Experiment
import org.apache.s2graph.core.rest.RestHandler
-import org.apache.s2graph.core.rest.RestHandler.HandlerResult
+import org.apache.s2graph.core.rest.RestHandler.{CanLookup, HandlerResult}
import org.apache.s2graph.core.utils.Extensions._
import org.apache.s2graph.core.utils.logger
import org.apache.s2graph.core.{Graph, JSONParser, PostProcess}
import play.api.libs.json._
import scala.collection.mutable
-import scala.concurrent.ExecutionContext
+import scala.concurrent.{ExecutionContext, Future}
import scala.io.Source
import scala.util.{Failure, Success, Try}
import scala.language.existentials
@@ -58,6 +58,10 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends
val NotFound = HttpResponseStatus.NOT_FOUND
val InternalServerError = HttpResponseStatus.INTERNAL_SERVER_ERROR
+ implicit val nettyHeadersLookup = new CanLookup[HttpHeaders] {
+ override def lookup(m: HttpHeaders, key: String) = Option(m.get(key))
+ }
+
def badRoute(ctx: ChannelHandlerContext) =
simpleResponse(ctx, BadGateway, byteBufOpt = None, channelFutureListenerOpt = CloseOpt)
@@ -82,7 +86,7 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends
}
}
- def toResponse(ctx: ChannelHandlerContext, req: FullHttpRequest, requestBody: JsValue, result: HandlerResult, startedAt: Long) = {
+ def toResponse(ctx: ChannelHandlerContext, req: FullHttpRequest, requestBody: String, result: HandlerResult, startedAt: Long) = {
var closeOpt = CloseOpt
var headers = mutable.ArrayBuilder.make[(String, String)]
@@ -119,43 +123,68 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends
}
}
+ private def healthCheck(ctx: ChannelHandlerContext)(predicate: Boolean): Unit = {
+ if (predicate) {
+ val healthCheckMsg = Unpooled.copiedBuffer(NettyServer.deployInfo, CharsetUtil.UTF_8)
+ simpleResponse(ctx, Ok, byteBufOpt = Option(healthCheckMsg), channelFutureListenerOpt = CloseOpt)
+ } else {
+ simpleResponse(ctx, NotFound, channelFutureListenerOpt = CloseOpt)
+ }
+ }
+
+ private def updateHealthCheck(ctx: ChannelHandlerContext)(newValue: Boolean)(updateOp: Boolean => Unit): Unit = {
+ updateOp(newValue)
+ val newHealthCheckMsg = Unpooled.copiedBuffer(newValue.toString, CharsetUtil.UTF_8)
+ simpleResponse(ctx, Ok, byteBufOpt = Option(newHealthCheckMsg), channelFutureListenerOpt = CloseOpt)
+ }
+
override def channelRead0(ctx: ChannelHandlerContext, req: FullHttpRequest): Unit = {
val uri = req.getUri
val startedAt = System.currentTimeMillis()
-
+ val checkFunc = healthCheck(ctx) _
+ val updateFunc = updateHealthCheck(ctx) _
req.getMethod match {
case HttpMethod.GET =>
uri match {
- case "/health_check.html" =>
- if (NettyServer.isHealthy) {
- val healthCheckMsg = Unpooled.copiedBuffer(NettyServer.deployInfo, CharsetUtil.UTF_8)
- simpleResponse(ctx, Ok, byteBufOpt = Option(healthCheckMsg), channelFutureListenerOpt = CloseOpt)
+ case "/health_check.html" => checkFunc(NettyServer.isHealthy)
+ case "/fallback_check.html" => checkFunc(NettyServer.isFallbackHealthy)
+ case "/query_fallback_check.html" => checkFunc(NettyServer.isQueryFallbackHealthy)
+ case s if s.startsWith("/graphs/getEdge/") =>
+ if (!NettyServer.isQueryFallbackHealthy) {
+ val result = HandlerResult(body = Future.successful(PostProcess.emptyResults))
+ toResponse(ctx, req, s, result, startedAt)
} else {
- simpleResponse(ctx, NotFound, channelFutureListenerOpt = CloseOpt)
+ val Array(srcId, tgtId, labelName, direction) = s.split("/").takeRight(4)
+ val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId))
+ val result = s2rest.checkEdges(params)
+ toResponse(ctx, req, s, result, startedAt)
}
-
- case s if s.startsWith("/graphs/getEdge/") =>
- // src, tgt, label, dir
- val Array(srcId, tgtId, labelName, direction) = s.split("/").takeRight(4)
- val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId))
- val result = s2rest.checkEdges(params)
- toResponse(ctx, req, params, result, startedAt)
case _ => badRoute(ctx)
}
case HttpMethod.PUT =>
if (uri.startsWith("/health_check/")) {
- val newHealthCheck = uri.split("/").last.toBoolean
- NettyServer.isHealthy = newHealthCheck
- val newHealthCheckMsg = Unpooled.copiedBuffer(NettyServer.isHealthy.toString, CharsetUtil.UTF_8)
- simpleResponse(ctx, Ok, byteBufOpt = Option(newHealthCheckMsg), channelFutureListenerOpt = CloseOpt)
- } else badRoute(ctx)
+ val newValue = uri.split("/").last.toBoolean
+ updateFunc(newValue) { v => NettyServer.isHealthy = v }
+ } else if (uri.startsWith("/query_fallback_check/")) {
+ val newValue = uri.split("/").last.toBoolean
+ updateFunc(newValue) { v => NettyServer.isQueryFallbackHealthy = v }
+ } else if (uri.startsWith("/fallback_check/")) {
+ val newValue = uri.split("/").last.toBoolean
+ updateFunc(newValue) { v => NettyServer.isFallbackHealthy = v }
+ } else {
+ badRoute(ctx)
+ }
case HttpMethod.POST =>
val body = req.content.toString(CharsetUtil.UTF_8)
-
- val result = s2rest.doPost(uri, body, Option(req.headers().get(Experiment.impressionKey)))
- toResponse(ctx, req, Json.parse(body), result, startedAt)
+ if (!NettyServer.isQueryFallbackHealthy) {
+ val result = HandlerResult(body = Future.successful(PostProcess.emptyResults))
+ toResponse(ctx, req, body, result, startedAt)
+ } else {
+ val result = s2rest.doPost(uri, body, req.headers())
+ toResponse(ctx, req, body, result, startedAt)
+ }
case _ =>
simpleResponse(ctx, BadRequest, byteBufOpt = None, channelFutureListenerOpt = CloseOpt)
@@ -163,9 +192,14 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends
}
override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
- cause.printStackTrace()
- logger.error(s"exception on query.", cause)
- simpleResponse(ctx, BadRequest, byteBufOpt = None, channelFutureListenerOpt = CloseOpt)
+ cause match {
+ case e: java.io.IOException =>
+ ctx.channel().close().addListener(CloseOpt.get)
+ case _ =>
+ cause.printStackTrace()
+ logger.error(s"exception on query.", cause)
+ simpleResponse(ctx, BadRequest, byteBufOpt = None, channelFutureListenerOpt = CloseOpt)
+ }
}
}
@@ -178,6 +212,7 @@ object NettyServer {
val config = ConfigFactory.load()
val port = Try(config.getInt("http.port")).recover { case _ => 9000 }.get
val transport = Try(config.getString("netty.transport")).recover { case _ => "jdk" }.get
+ val maxBodySize = Try(config.getInt("max.body.size")).recover { case _ => 65536 * 2 }.get
// init s2graph with config
val s2graph = new Graph(config)(ec)
@@ -185,6 +220,8 @@ object NettyServer {
val deployInfo = Try(Source.fromFile("./release_info").mkString("")).recover { case _ => "release info not found\n" }.get
var isHealthy = config.getBooleanWithFallback("app.health.on", true)
+ var isFallbackHealthy = true
+ var isQueryFallbackHealthy = true
logger.info(s"starts with num of thread: $numOfThread, ${threadPool.getClass.getSimpleName}")
logger.info(s"transport: $transport")
@@ -201,14 +238,13 @@ object NettyServer {
try {
val b: ServerBootstrap = new ServerBootstrap()
.option(ChannelOption.SO_BACKLOG, Int.box(2048))
-
b.group(bossGroup, workerGroup).channel(channelClass)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer[SocketChannel] {
override def initChannel(ch: SocketChannel) {
val p = ch.pipeline()
p.addLast(new HttpServerCodec())
- p.addLast(new HttpObjectAggregator(65536))
+ p.addLast(new HttpObjectAggregator(maxBodySize))
p.addLast(new S2RestHandler(rest)(ec))
}
})
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
index 30a5ee4..692ab1e 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
@@ -52,7 +52,7 @@ object Global extends WithFilters(new GzipFilter()) {
// init s2graph with config
s2graph = new Graph(config)(ec)
storageManagement = new Management(s2graph)
- s2parser = new RequestParser(s2graph.config) // merged config
+ s2parser = new RequestParser(s2graph)
s2rest = new RestHandler(s2graph)(ec)
logger.info(s"starts with num of thread: $numOfThread, ${threadPool.getClass.getSimpleName}")
@@ -83,7 +83,7 @@ object Global extends WithFilters(new GzipFilter()) {
wallLogHandler.shutdown()
QueueActor.shutdown()
- /*
+ /**
* shutdown hbase client for flush buffers.
*/
shutdown()
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala b/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala
index 3c488fe..3c49954 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala
@@ -43,8 +43,12 @@ object Config {
lazy val KAFKA_METADATA_BROKER_LIST = conf.getString("kafka.metadata.broker.list").getOrElse("localhost")
lazy val KAFKA_LOG_TOPIC = s"s2graphIn${PHASE}"
+ lazy val KAFKA_LOG_TOPIC_JSON = s"s2graphIn${PHASE}Json"
lazy val KAFKA_LOG_TOPIC_ASYNC = s"s2graphIn${PHASE}Async"
+ lazy val KAFKA_LOG_TOPIC_ASYNC_JSON = s"s2graphIn${PHASE}AsyncJson"
lazy val KAFKA_FAIL_TOPIC = s"s2graphIn${PHASE}Failed"
+ lazy val KAFKA_FAIL_TOPIC_JSON = s"s2graphIn${PHASE}FailedJson"
+ lazy val KAFKA_MUTATE_FAIL_TOPIC = s"mutateFailed_${PHASE}"
// is query or write
lazy val IS_QUERY_SERVER = conf.getBoolean("is.query.server").getOrElse(true)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
index 13639b9..b32c16a 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
@@ -22,10 +22,10 @@ package org.apache.s2graph.rest.play.controllers
import akka.util.ByteString
import org.apache.s2graph.core.GraphExceptions.BadQueryException
import org.apache.s2graph.core.PostProcess
+import org.apache.s2graph.core.rest.RestHandler.CanLookup
import org.apache.s2graph.core.utils.logger
import org.apache.s2graph.rest.play.config.Config
import play.api.http.HttpEntity
-import play.api.libs.iteratee.Enumerator
import play.api.libs.json.{JsString, JsValue}
import play.api.mvc._
@@ -42,6 +42,10 @@ object ApplicationController extends Controller {
val jsonText: BodyParser[String] = s2parse.jsonText
+ implicit val oneTupleLookup = new CanLookup[Headers] {
+ override def lookup(m: Headers, key: String) = m.get(key)
+ }
+
private def badQueryExceptionResults(ex: Exception) =
Future.successful(BadRequest(PostProcess.badRequestResults(ex)).as(applicationJsonHeader))
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
index 53f3fce..b3ac89d 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
@@ -156,9 +156,11 @@ object CounterController extends Controller {
useProfile = useProfile, bucketImpId, useRank = useRank, ttl, dailyTtl, Some(hbaseTable), intervalUnit,
rateActionId, rateBaseId, rateThreshold)
- // prepare exact storage
- exactCounter(version).prepare(policy)
- if (useRank) {
+ if (rateAction.isEmpty) {
+ // prepare exact storage
+ exactCounter(version).prepare(policy)
+ }
+ if (useRank || rateAction.isDefined) {
// prepare ranking storage
rankingCounter(version).prepare(policy)
}
@@ -253,8 +255,11 @@ object CounterController extends Controller {
// change table name
val newTableName = Seq(tablePrefixMap(version), service, policy.ttl) ++ policy.dailyTtl mkString "_"
val newPolicy = policy.copy(version = version, hbaseTable = Some(newTableName))
- exactCounter(version).prepare(newPolicy)
- if (newPolicy.useRank) {
+
+ if (newPolicy.rateActionId.isEmpty) {
+ exactCounter(version).prepare(newPolicy)
+ }
+ if (newPolicy.useRank || newPolicy.rateActionId.isDefined) {
rankingCounter(version).prepare(newPolicy)
}
Ok(Json.toJson(Map("msg" -> s"prepare storage v$version $service/$action")))
@@ -272,8 +277,10 @@ object CounterController extends Controller {
policy <- counterModel.findByServiceAction(service, action, useCache = false)
} yield {
Try {
- exactCounter(policy.version).destroy(policy)
- if (policy.useRank) {
+ if (policy.rateActionId.isEmpty) {
+ exactCounter(policy.version).destroy(policy)
+ }
+ if (policy.useRank || policy.rateActionId.isDefined) {
rankingCounter(policy.version).destroy(policy)
}
counterModel.deleteServiceAction(policy)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
index da88c3d..8000cf8 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
@@ -19,6 +19,7 @@
package org.apache.s2graph.rest.play.controllers
+import com.fasterxml.jackson.databind.JsonMappingException
import org.apache.s2graph.core._
import org.apache.s2graph.core.mysqls.Label
import org.apache.s2graph.core.rest.RequestParser
@@ -30,19 +31,19 @@ import play.api.mvc.{Controller, Result}
import scala.collection.Seq
import scala.concurrent.Future
-import scala.util.Random
object EdgeController extends Controller {
import ApplicationController._
+ import ExceptionHandler._
import play.api.libs.concurrent.Execution.Implicits._
private val s2: Graph = org.apache.s2graph.rest.play.Global.s2graph
private val requestParser: RequestParser = org.apache.s2graph.rest.play.Global.s2parser
private val walLogHandler: ExceptionHandler = org.apache.s2graph.rest.play.Global.wallLogHandler
- private def enqueue(topic: String, elem: GraphElement, tsv: String) = {
- val kafkaMessage = ExceptionHandler.toKafkaMessage(topic, elem, Option(tsv))
+ private def enqueue(topic: String, elem: GraphElement, tsv: String, publishJson: Boolean = false) = {
+ val kafkaMessage = ExceptionHandler.toKafkaMessage(topic, elem, Option(tsv), publishJson)
walLogHandler.enqueue(kafkaMessage)
}
@@ -50,63 +51,129 @@ object EdgeController extends Controller {
val kafkaTopic = toKafkaTopic(graphElem.isAsync)
graphElem match {
- case v: Vertex => enqueue(kafkaTopic, graphElem, tsv)
+ case v: Vertex =>
+ enqueue(kafkaTopic, graphElem, tsv)
case e: Edge =>
e.label.extraOptions.get("walLog") match {
- case None => enqueue(kafkaTopic, e, tsv)
+ case None =>
+ enqueue(kafkaTopic, e, tsv)
case Some(walLogOpt) =>
- (walLogOpt \ "method").as[JsValue] match {
+ (walLogOpt \ "method").get match {
case JsString("drop") => // pass
case JsString("sample") =>
val rate = (walLogOpt \ "rate").as[Double]
- if (scala.util.Random.nextDouble() < rate) enqueue(kafkaTopic, e, tsv)
- case _ => enqueue(kafkaTopic, e, tsv)
+ if (scala.util.Random.nextDouble() < rate) {
+ enqueue(kafkaTopic, e, tsv)
+ }
+ case _ =>
+ enqueue(kafkaTopic, e, tsv)
}
}
+ case _ => logger.error(s"Unknown graph element type: ${graphElem}")
}
}
+ private def toDeleteAllFailMessages(srcVertices: Seq[Vertex], labels: Seq[Label], dir: Int, ts: Long ) = {
+ for {
+ vertex <- srcVertices
+ id = vertex.id.toString
+ label <- labels
+ } yield {
+ val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", GraphUtil.fromOp(dir.toByte)).mkString("\t")
+ ExceptionHandler.toKafkaMessage(Config.KAFKA_MUTATE_FAIL_TOPIC, tsv)
+ }
+ }
+
+ private def publishFailTopic(kafkaMessages: Seq[KafkaMessage]): Unit ={
+ kafkaMessages.foreach(walLogHandler.enqueue)
+ }
+
+ def mutateElementsWithFailLog(elements: Seq[(GraphElement, String)]) ={
+ val result = s2.mutateElements(elements.map(_._1), true)
+ result onComplete { results =>
+ results.get.zip(elements).map {
+ case (false, (e: Edge, tsv: String)) =>
+ val kafkaMessages = if(e.op == GraphUtil.operations("deleteAll")){
+ toDeleteAllFailMessages(Seq(e.srcVertex), Seq(e.label), e.labelWithDir.dir, e.ts)
+ } else{
+ Seq(ExceptionHandler.toKafkaMessage(Config.KAFKA_MUTATE_FAIL_TOPIC, e, Some(tsv)))
+ }
+ publishFailTopic(kafkaMessages)
+ case _ =>
+ }
+ }
+ result
+ }
+
private def tryMutate(elementsWithTsv: Seq[(GraphElement, String)], withWait: Boolean): Future[Result] = {
if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized)
else {
- try {
- elementsWithTsv.foreach { case (graphElem, tsv) =>
- publish(graphElem, tsv)
- }
+ elementsWithTsv.foreach { case (graphElem, tsv) =>
+ publish(graphElem, tsv)
+ }
- val elementsToStore = for {
- (e, _tsv) <- elementsWithTsv if !skipElement(e.isAsync)
- } yield e
-
- if (elementsToStore.isEmpty) Future.successful(jsonResponse(JsArray()))
- else {
- if (withWait) {
- val rets = s2.mutateElements(elementsToStore, withWait)
- rets.map(Json.toJson(_)).map(jsonResponse(_))
- } else {
- val rets = elementsToStore.map { element => QueueActor.router ! element; true }
- Future.successful(jsonResponse(Json.toJson(rets)))
+ if (elementsWithTsv.isEmpty) Future.successful(jsonResponse(JsArray()))
+ else {
+ val elementWithIdxs = elementsWithTsv.zipWithIndex
+ if (withWait) {
+ val (elementSync, elementAsync) = elementWithIdxs.partition { case ((element, tsv), idx) =>
+ !skipElement(element.isAsync)
+ }
+ val retToSkip = elementAsync.map(_._2 -> true)
+ val elementsToStore = elementSync.map(_._1)
+ val elementsIdxToStore = elementSync.map(_._2)
+ mutateElementsWithFailLog(elementsToStore).map { rets =>
+ elementsIdxToStore.zip(rets) ++ retToSkip
+ }.map { rets =>
+ Json.toJson(rets.sortBy(_._1).map(_._2))
+ }.map(jsonResponse(_))
+ } else {
+ val rets = elementWithIdxs.map { case ((element, tsv), idx) =>
+ if (!skipElement(element.isAsync)) QueueActor.router ! (element, tsv)
+ true
}
+ Future.successful(jsonResponse(Json.toJson(rets)))
}
- } catch {
- case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"$e"))
- case e: Throwable =>
- logger.error(s"tryMutate: ${e.getMessage}", e)
- Future.successful(InternalServerError(s"${e.getStackTrace}"))
}
}
}
def mutateJsonFormat(jsValue: JsValue, operation: String, withWait: Boolean = false): Future[Result] = {
logger.debug(s"$jsValue")
- val edgesWithTsv = requestParser.parseJsonFormat(jsValue, operation)
- tryMutate(edgesWithTsv, withWait)
+
+ try {
+ val edgesWithTsv = requestParser.parseJsonFormat(jsValue, operation)
+ tryMutate(edgesWithTsv, withWait)
+ } catch {
+ case e: JsonMappingException =>
+ logger.malformed(jsValue, e)
+ Future.successful(BadRequest(s"${e.getMessage}"))
+ case e: GraphExceptions.JsonParseException =>
+ logger.malformed(jsValue, e)
+ Future.successful(BadRequest(s"${e.getMessage}"))
+ case e: Exception =>
+ logger.malformed(jsValue, e)
+ Future.failed(e)
+ }
}
def mutateBulkFormat(str: String, withWait: Boolean = false): Future[Result] = {
logger.debug(s"$str")
- val elementsWithTsv = requestParser.parseBulkFormat(str)
- tryMutate(elementsWithTsv, withWait)
+
+ try {
+ val elementsWithTsv = requestParser.parseBulkFormat(str)
+ tryMutate(elementsWithTsv, withWait)
+ } catch {
+ case e: JsonMappingException =>
+ logger.malformed(str, e)
+ Future.successful(BadRequest(s"${e.getMessage}"))
+ case e: GraphExceptions.JsonParseException =>
+ logger.malformed(str, e)
+ Future.successful(BadRequest(s"${e.getMessage}"))
+ case e: Exception =>
+ logger.malformed(str, e)
+ Future.failed(e)
+ }
}
def mutateBulk() = withHeaderAsync(parse.text) { request =>
@@ -163,29 +230,34 @@ object EdgeController extends Controller {
if (edges.isEmpty) Future.successful(jsonResponse(JsArray()))
else {
+
s2.incrementCounts(edges, withWait = true).map { results =>
val json = results.map { case (isSuccess, resultCount) =>
Json.obj("success" -> isSuccess, "result" -> resultCount)
}
+
jsonResponse(Json.toJson(json))
}
}
}
def deleteAll() = withHeaderAsync(jsonParser) { request =>
-// deleteAllInner(request.body, withWait = false)
deleteAllInner(request.body, withWait = true)
}
+ def deleteAllWithOutWait() = withHeaderAsync(jsonParser) { request =>
+ deleteAllInner(request.body, withWait = false)
+ }
+
def deleteAllInner(jsValue: JsValue, withWait: Boolean) = {
- /* logging for delete all request */
+ /** logging for delete all request */
def enqueueLogMessage(ids: Seq[JsValue], labels: Seq[Label], ts: Long, direction: String, topicOpt: Option[String]) = {
val kafkaMessages = for {
id <- ids
label <- labels
} yield {
- val tsv = Seq(ts, "deleteAll", "e", requestParser.jsToStr(id), requestParser.jsToStr(id), label.label, "{}", direction).mkString("\t")
+ val tsv = Seq(ts, "deleteAll", "e", RequestParser.jsToStr(id), RequestParser.jsToStr(id), label.label, "{}", direction).mkString("\t")
val topic = topicOpt.getOrElse { toKafkaTopic(label.isAsync) }
ExceptionHandler.toKafkaMessage(topic, tsv)
@@ -194,10 +266,18 @@ object EdgeController extends Controller {
kafkaMessages.foreach(walLogHandler.enqueue)
}
- def deleteEach(labels: Seq[Label], direction: String, ids: Seq[JsValue], ts: Long, vertices: Seq[Vertex]) = {
- enqueueLogMessage(ids, labels, ts, direction, None)
+ def deleteEach(labels: Seq[Label], direction: String, ids: Seq[JsValue],
+ ts: Long, vertices: Seq[Vertex]) = {
+
val future = s2.deleteAllAdjacentEdges(vertices.toList, labels, GraphUtil.directions(direction), ts)
if (withWait) {
+ future onComplete {
+ case ret =>
+ if (!ret.get) {
+ val messages = toDeleteAllFailMessages(vertices.toList, labels, GraphUtil.directions(direction), ts)
+ publishFailTopic(messages)
+ }
+ }
future
} else {
Future.successful(true)
@@ -205,9 +285,13 @@ object EdgeController extends Controller {
}
val deleteFutures = jsValue.as[Seq[JsValue]].map { json =>
- val (labels, direction, ids, ts, vertices) = requestParser.toDeleteParam(json)
+ val (_labels, direction, ids, ts, vertices) = requestParser.toDeleteParam(json)
+ val srcVertices = vertices
+ enqueueLogMessage(ids, _labels, ts, direction, None)
+ val labels = _labels.filterNot(e => skipElement(e.isAsync))
+
if (labels.isEmpty || ids.isEmpty) Future.successful(true)
- else deleteEach(labels, direction, ids, ts, vertices)
+ else deleteEach(labels, direction, ids, ts, srcVertices)
}
val deleteResults = Future.sequence(deleteFutures)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala
index 9c4a061..760211a 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala
@@ -19,7 +19,6 @@
package org.apache.s2graph.rest.play.controllers
-import org.apache.s2graph.core.mysqls.Experiment
import org.apache.s2graph.core.rest.RestHandler
import play.api.mvc._
@@ -30,10 +29,10 @@ object ExperimentController extends Controller {
import ApplicationController._
+ def experiments() = experiment("", "", "")
def experiment(accessToken: String, experimentName: String, uuid: String) = withHeaderAsync(jsonText) { request =>
val body = request.body
-
- val res = rest.doPost(request.uri, body, request.headers.get(Experiment.impressionKey))
+ val res = rest.doPost(request.uri, body, request.headers)
res.body.map { case js =>
val headers = res.headers :+ ("result_size" -> rest.calcSize(js).toString)
jsonResponse(js, headers: _*)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
index 0260b7a..6a7d4f7 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
@@ -63,12 +63,4 @@ object PublishController extends Controller {
def publish(topic: String) = publishOnly(topic)
- // def mutateBulk(topic: String) = Action.async(parse.text) { request =>
- // EdgeController.mutateAndPublish(Config.KAFKA_LOG_TOPIC, Config.KAFKA_FAIL_TOPIC, request.body).map { result =>
- // result.withHeaders(CONNECTION -> "Keep-Alive", "Keep-Alive" -> "timeout=10, max=10")
- // }
- // }
- def mutateBulk(topic: String) = withHeaderAsync(parse.text) { request =>
- EdgeController.mutateBulkFormat(request.body)
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala
index 495cf7b..1e49ca7 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala
@@ -19,8 +19,6 @@
package org.apache.s2graph.rest.play.controllers
-import org.apache.s2graph.core.JSONParser
-import org.apache.s2graph.core.mysqls.Experiment
import org.apache.s2graph.core.rest.RestHandler
import play.api.libs.json.Json
import play.api.mvc._
@@ -31,13 +29,11 @@ object QueryController extends Controller {
import ApplicationController._
import play.api.libs.concurrent.Execution.Implicits.defaultContext
-
private val rest: RestHandler = org.apache.s2graph.rest.play.Global.s2rest
def delegate(request: Request[String]) = {
- rest.doPost(request.uri, request.body, request.headers.get(Experiment.impressionKey)).body.map {
- js =>
- jsonResponse(js, "result_size" -> rest.calcSize(js).toString)
+ rest.doPost(request.uri, request.body, request.headers).body.map { js =>
+ jsonResponse(js, "result_size" -> rest.calcSize(js).toString)
} recoverWith ApplicationController.requestFallback(request.body)
}
@@ -58,13 +54,12 @@ object QueryController extends Controller {
def getEdgesGroupedExcludedFormatted() = withHeaderAsync(jsonText)(delegate)
def getEdge(srcId: String, tgtId: String, labelName: String, direction: String) =
- withHeaderAsync(jsonText) {
- request =>
- val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId))
- rest.checkEdges(params).body.map {
- js =>
- jsonResponse(js, "result_size" -> rest.calcSize(js).toString)
- } recoverWith ApplicationController.requestFallback(request.body)
+ withHeaderAsync(jsonText) { request =>
+ val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId))
+ rest.checkEdges(params).body.map {
+ js =>
+ jsonResponse(js, "result_size" -> rest.calcSize(js).toString)
+ } recoverWith ApplicationController.requestFallback(request.body)
}
def getVertices() = withHeaderAsync(jsonText)(delegate)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
index 0fdbe43..72e6e82 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
@@ -50,14 +50,16 @@ object VertexController extends Controller {
}
//FIXME:
- val verticesToStore = vertices.filterNot(v => v.isAsync)
-
- if (withWait) {
- val rets = s2.mutateVertices(verticesToStore, withWait = true)
- rets.map(Json.toJson(_)).map(jsonResponse(_))
- } else {
- val rets = verticesToStore.map { vertex => QueueActor.router ! vertex; true }
- Future.successful(jsonResponse(Json.toJson(rets)))
+ val verticesToStore = vertices.filterNot(v => skipElement(v.isAsync))
+ if (verticesToStore.isEmpty) Future.successful(jsonResponse(Json.toJson(Seq.empty[Boolean])))
+ else {
+ if (withWait) {
+ val rets = s2.mutateVertices(verticesToStore, withWait = true)
+ rets.map(Json.toJson(_)).map(jsonResponse(_))
+ } else {
+ val rets = verticesToStore.map { vertex => QueueActor.router ! vertex; true }
+ Future.successful(jsonResponse(Json.toJson(rets)))
+ }
}
} catch {
case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"e"))
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/conf/reference.conf
----------------------------------------------------------------------
diff --git a/s2rest_play/conf/reference.conf b/s2rest_play/conf/reference.conf
index bda503c..c3b716b 100644
--- a/s2rest_play/conf/reference.conf
+++ b/s2rest_play/conf/reference.conf
@@ -125,6 +125,7 @@ local.queue.actor.rate.limit=1000000
# local retry number
max.retry.number=100
max.back.off=50
+back.off.timeout=1000
delete.all.fetch.size=10000
hbase.fail.prob=-1.0
lock.expire.time=600000
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/conf/routes
----------------------------------------------------------------------
diff --git a/s2rest_play/conf/routes b/s2rest_play/conf/routes
index ee49c69..c360e6f 100644
--- a/s2rest_play/conf/routes
+++ b/s2rest_play/conf/routes
@@ -23,7 +23,7 @@
# publish
-POST /publish/:topic org.apache.s2graph.rest.play.controllers.PublishController.mutateBulk(topic)
+#POST /publish/:topic org.apache.s2graph.rest.play.controllers.PublishController.mutateBulk(topic)
POST /publishOnly/:topic org.apache.s2graph.rest.play.controllers.PublishController.publishOnly(topic)
#### Health Check
@@ -37,6 +37,7 @@ POST /graphs/edges/insertBulk org.ap
POST /graphs/edges/delete org.apache.s2graph.rest.play.controllers.EdgeController.deletes()
POST /graphs/edges/deleteWithWait org.apache.s2graph.rest.play.controllers.EdgeController.deletesWithWait()
POST /graphs/edges/deleteAll org.apache.s2graph.rest.play.controllers.EdgeController.deleteAll()
+POST /graphs/edges/deleteAllWithOutWait org.apache.s2graph.rest.play.controllers.EdgeController.deleteAllWithOutWait()
POST /graphs/edges/update org.apache.s2graph.rest.play.controllers.EdgeController.updates()
POST /graphs/edges/updateWithWait org.apache.s2graph.rest.play.controllers.EdgeController.updatesWithWait()
POST /graphs/edges/increment org.apache.s2graph.rest.play.controllers.EdgeController.increments()
@@ -81,7 +82,7 @@ GET /graphs/getLabels/:serviceName org.ap
POST /graphs/createLabel org.apache.s2graph.rest.play.controllers.AdminController.createLabel()
POST /graphs/addIndex org.apache.s2graph.rest.play.controllers.AdminController.addIndex()
GET /graphs/getLabel/:labelName org.apache.s2graph.rest.play.controllers.AdminController.getLabel(labelName)
-PUT /graphs/deleteLabel/:labelName org.apache.s2graph.rest.play.controllers.AdminController.deleteLabel(labelName)
+PUT /graphs/deleteLabelReally/:labelName org.apache.s2graph.rest.play.controllers.AdminController.deleteLabel(labelName)
POST /graphs/addProp/:labelName org.apache.s2graph.rest.play.controllers.AdminController.addProp(labelName)
POST /graphs/createServiceColumn org.apache.s2graph.rest.play.controllers.AdminController.createServiceColumn()
@@ -117,7 +118,7 @@ POST /counter/v1/mget org.apac
# Experiment API
POST /graphs/experiment/:accessToken/:experimentName/:uuid org.apache.s2graph.rest.play.controllers.ExperimentController.experiment(accessToken, experimentName, uuid)
-
+POST /graphs/experiments org.apache.s2graph.rest.play.controllers.ExperimentController.experiments()
# Map static resources from the /public folder to the /assets URL path
GET /images/*file controllers.Assets.at(path="/public/images", file)
[2/3] incubator-s2graph git commit: [S2GRAPH-121]: Create `Result`
class to hold traverse result edges.
Posted by st...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
index ede1127..0377bd8 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
@@ -19,10 +19,53 @@
package org.apache.s2graph.core
+import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core.mysqls.{ColumnMeta, Service, ServiceColumn}
import org.apache.s2graph.core.types.{InnerVal, InnerValLike, SourceVertexId, VertexId}
import play.api.libs.json.Json
-
+//
+//object S2Vertex {
+// def apply(graph: Graph, vertex: Vertex): S2Vertex = {
+// S2Vertex(graph,
+// vertex.serviceName,
+// vertex.serviceColumn.columnName,
+// vertex.innerIdVal,
+// vertex.serviceColumn.innerValsToProps(vertex.props),
+// vertex.ts,
+// GraphUtil.fromOp(vertex.op)
+// )
+// }
+//}
+//
+//case class S2Vertex(graph: Graph,
+// serviceName: String,
+// columnName: String,
+// id: Any,
+// props: Map[String, Any] = Map.empty,
+// ts: Long = System.currentTimeMillis(),
+// operation: String = "insert") extends GraphElement {
+// lazy val vertex = {
+// val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found."))
+// val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found."))
+// val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
+//
+// val srcVertexId = VertexId(column.id.get, toInnerVal(id.toString, column.columnType, column.schemaVersion))
+// val propsInner = column.propsToInnerVals(props) ++
+// Map(ColumnMeta.timeStampSeq.toInt -> InnerVal.withLong(ts, column.schemaVersion))
+//
+// Vertex(srcVertexId, ts, propsInner, op)
+// }
+//
+// val uniqueId = (serviceName, columnName, id)
+//
+// override def isAsync: Boolean = vertex.isAsync
+//
+// override def toLogString(): String = vertex.toLogString()
+//
+// override def queueKey: String = vertex.queueKey
+//
+// override def queuePartitionKey: String = vertex.queuePartitionKey
+//}
case class Vertex(id: VertexId,
ts: Long = System.currentTimeMillis(),
props: Map[Int, InnerValLike] = Map.empty[Int, InnerValLike],
@@ -31,10 +74,19 @@ case class Vertex(id: VertexId,
val innerId = id.innerId
+ val innerIdVal = innerId.value
+
+ lazy val properties = for {
+ (k, v) <- props
+ meta <- serviceColumn.metasMap.get(k)
+ } yield meta.name -> v.value
+
def schemaVer = serviceColumn.schemaVersion
def serviceColumn = ServiceColumn.findById(id.colId)
+ def columnName = serviceColumn.columnName
+
def service = Service.findById(serviceColumn.serviceId)
lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, service.hTableName)
@@ -114,7 +166,21 @@ object Vertex {
def isLabelId(propKey: Int): Boolean = propKey > Byte.MaxValue
- // val emptyVertex = Vertex(new CompositeId(CompositeId.defaultColId, CompositeId.defaultInnerId, false, true),
- // System.currentTimeMillis())
- def fromString(s: String): Option[Vertex] = Graph.toVertex(s)
+ def toVertex(serviceName: String,
+ columnName: String,
+ id: Any,
+ props: Map[String, Any] = Map.empty,
+ ts: Long = System.currentTimeMillis(),
+ operation: String = "insert"): Vertex = {
+
+ val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found."))
+ val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found."))
+ val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
+
+ val srcVertexId = VertexId(column.id.get, toInnerVal(id.toString, column.columnType, column.schemaVersion))
+ val propsInner = column.propsToInnerVals(props) ++
+ Map(ColumnMeta.timeStampSeq.toInt -> InnerVal.withLong(ts, column.schemaVersion))
+
+ new Vertex(srcVertexId, ts, propsInner, op)
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
index 9bd172d..cd825f4 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
@@ -25,7 +25,8 @@ import scalikejdbc._
import scala.util.Random
object Experiment extends Model[Experiment] {
- val impressionKey = "S2-Impression-Id"
+ val ImpressionKey = "S2-Impression-Id"
+ val ImpressionId = "Impression-Id"
def apply(rs: WrappedResultSet): Experiment = {
Experiment(rs.intOpt("id"),
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
index 2734211..f7318f6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
@@ -19,13 +19,15 @@
package org.apache.s2graph.core.mysqls
+import java.util.Calendar
import org.apache.s2graph.core.GraphExceptions.ModelNotFoundException
import org.apache.s2graph.core.GraphUtil
+import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
+import org.apache.s2graph.core.types.{InnerValLike, InnerValLikeWithTs}
import org.apache.s2graph.core.utils.logger
-import org.apache.s2graph.core.JSONParser._
-import play.api.libs.json._
+import play.api.libs.json.{JsObject, JsValue, Json}
import scalikejdbc._
object Label extends Model[Label] {
@@ -48,6 +50,7 @@ object Label extends Model[Label] {
Label.delete(id.get)
}
+
def findByName(labelName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Label] = {
val cacheKey = "label=" + labelName
lazy val labelOpt =
@@ -292,11 +295,16 @@ case class Label(id: Option[Int], label: String,
lazy val srcColumn = ServiceColumn.find(srcServiceId, srcColumnName).getOrElse(throw ModelNotFoundException("Source column not found"))
lazy val tgtColumn = ServiceColumn.find(tgtServiceId, tgtColumnName).getOrElse(throw ModelNotFoundException("Target column not found"))
- lazy val direction = if (isDirected) "out" else "undirected"
lazy val defaultIndex = LabelIndex.findByLabelIdAndSeq(id.get, LabelIndex.DefaultSeq)
//TODO: Make sure this is correct
+
+// lazy val metas = metas(useCache = true)
lazy val indices = LabelIndex.findByLabelIdAll(id.get, useCache = true)
+ lazy val labelMetas = LabelMeta.findAllByLabelId(id.get, useCache = true)
+ lazy val labelMetaSet = labelMetas.toSet
+ lazy val labelMetaMap = (labelMetas ++ LabelMeta.reservedMetas).map(m => m.seq -> m).toMap
+
lazy val indicesMap = indices.map(idx => (idx.seq, idx)) toMap
lazy val indexSeqsMap = indices.map(idx => (idx.metaSeqs, idx)) toMap
lazy val indexNameMap = indices.map(idx => (idx.name, idx)) toMap
@@ -327,14 +335,37 @@ case class Label(id: Option[Int], label: String,
jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType)
} yield prop.name -> jsValue).toMap
+ lazy val metaPropsDefaultMapInnerString = (for {
+ prop <- metaPropsInner if LabelMeta.isValidSeq(prop.seq)
+ innerVal = InnerValLikeWithTs(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), System.currentTimeMillis())
+ } yield prop.name -> innerVal).toMap
+
lazy val metaPropsDefaultMapInner = (for {
prop <- metaPropsInner if LabelMeta.isValidSeq(prop.seq)
+ innerVal = InnerValLikeWithTs(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), System.currentTimeMillis())
+ } yield prop -> innerVal).toMap
+ lazy val metaPropsDefaultMapInnerSeq = metaPropsDefaultMapInner.toSeq
+ lazy val metaPropsJsValueWithDefault = (for {
+ prop <- metaProps if LabelMeta.isValidSeq(prop.seq)
jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType)
- } yield prop.name -> jsValue).toMap
+ } yield prop -> jsValue).toMap
+// lazy val extraOptions = Model.extraOptions(Option("""{
+// "storage": {
+// "s2graph.storage.backend": "rocks",
+// "rocks.db.path": "/tmp/db"
+// }
+// }"""))
lazy val extraOptions: Map[String, JsValue] = options match {
case None => Map.empty
- case Some(v) => Json.parse(v).asOpt[JsObject].map { obj => obj.fields.toMap }.getOrElse(Map.empty)
+ case Some(v) =>
+ try {
+ Json.parse(v).asOpt[JsObject].map { obj => obj.fields.toMap }.getOrElse(Map.empty)
+ } catch {
+ case e: Exception =>
+ logger.error(s"An error occurs while parsing the extra label option: ${label}", e)
+ Map.empty
+ }
}
def srcColumnWithDir(dir: Int) = {
@@ -386,5 +417,23 @@ case class Label(id: Option[Int], label: String,
)
+ def propsToInnerValsWithTs(props: Map[String, Any],
+ ts: Long = System.currentTimeMillis()): Map[Byte, InnerValLikeWithTs] = {
+ for {
+ (k, v) <- props
+ labelMeta <- metaPropsInvMap.get(k)
+ innerVal = toInnerVal(v.toString, labelMeta.dataType, schemaVersion)
+ } yield labelMeta.seq -> InnerValLikeWithTs(innerVal, ts)
+
+ }
+
+ def innerValsWithTsToProps(props: Map[Byte, InnerValLikeWithTs]): Map[String, Any] = {
+ for {
+ (k, v) <- props
+ labelMeta <- metaPropsMap.get(k)
+ } yield {
+ labelMeta.name -> v.innerVal.value
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
index b68fa79..6fceabc 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
@@ -24,6 +24,8 @@ package org.apache.s2graph.core.mysqls
*/
import org.apache.s2graph.core.JSONParser
+import org.apache.s2graph.core.JSONParser._
+import org.apache.s2graph.core.types.{InnerValLikeWithTs, InnerValLike}
import play.api.libs.json.Json
import scalikejdbc._
object ServiceColumn extends Model[ServiceColumn] {
@@ -89,13 +91,40 @@ object ServiceColumn extends Model[ServiceColumn] {
})
}
}
-case class ServiceColumn(id: Option[Int], serviceId: Int, columnName: String, columnType: String, schemaVersion: String) {
+case class ServiceColumn(id: Option[Int], serviceId: Int, columnName: String, columnType: String, schemaVersion: String) {
lazy val service = Service.findById(serviceId)
lazy val metas = ColumnMeta.findAllByColumn(id.get)
+ lazy val metasMap = metas.map { meta => meta.seq.toInt -> meta } toMap
lazy val metasInvMap = metas.map { meta => meta.name -> meta} toMap
lazy val metaNamesMap = (ColumnMeta.lastModifiedAtColumn :: metas).map(x => (x.seq.toInt, x.name)) toMap
lazy val toJson = Json.obj("serviceName" -> service.serviceName, "columnName" -> columnName, "columnType" -> columnType)
+ def propsToInnerVals(props: Map[String, Any]): Map[Int, InnerValLike] = {
+ for {
+ (k, v) <- props
+ labelMeta <- metasInvMap.get(k)
+ innerVal = toInnerVal(v.toString, labelMeta.dataType, schemaVersion)
+ } yield labelMeta.seq.toInt -> innerVal
+ }
+
+ def innerValsToProps(props: Map[Int, InnerValLike]): Map[String, Any] = {
+ for {
+ (k, v) <- props
+ columnMeta <- metasMap.get(k)
+ } yield {
+ columnMeta.name -> v.value
+ }
+ }
+
+ def innerValsWithTsToProps(props: Map[Int, InnerValLikeWithTs]): Map[String, Any] = {
+ for {
+ (k, v) <- props
+ columnMeta <- metasMap.get(k)
+ } yield {
+ columnMeta.name -> v.innerVal.value
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
index 5920f3c..1c93667 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
@@ -22,7 +22,7 @@ package org.apache.s2graph.core.parsers
import org.apache.s2graph.core.GraphExceptions.WhereParserException
import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
import org.apache.s2graph.core.types.InnerValLike
-import org.apache.s2graph.core.{Edge, GraphExceptions, JSONParser}
+import org.apache.s2graph.core.Edge
import org.apache.s2graph.core.JSONParser._
import scala.annotation.tailrec
import scala.util.Try
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index 52ee50d..d77ac7d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -36,9 +36,11 @@ import scala.util.{Failure, Success, Try}
object TemplateHelper {
val findVar = """\"?\$\{(.*?)\}\"?""".r
val num = """(next_day|next_hour|next_week|now)?\s*(-?\s*[0-9]+)?\s*(hour|day|week)?""".r
+
val hour = 60 * 60 * 1000L
val day = hour * 24L
val week = day * 7L
+
def calculate(now: Long, n: Int, unit: String): Long = {
val duration = unit match {
case "hour" | "HOUR" => n * hour
@@ -72,12 +74,32 @@ object TemplateHelper {
}
}
-class RequestParser(config: Config) {
+object RequestParser {
+ type ExperimentParam = (JsObject, String, String, String, Option[String])
+ val defaultLimit = 100
+
+ def toJsValues(jsValue: JsValue): List[JsValue] = {
+ jsValue match {
+ case obj: JsObject => List(obj)
+ case arr: JsArray => arr.as[List[JsValue]]
+ case _ => List.empty[JsValue]
+ }
+ }
+
+ def jsToStr(js: JsValue): String = js match {
+ case JsString(s) => s
+ case _ => js.toString()
+ }
+
+}
+
+class RequestParser(graph: Graph) {
import Management.JsonModel._
+ import RequestParser._
- val hardLimit = 100000
- val defaultLimit = 100
+ val config = graph.config
+ val hardLimit = config.getInt("query.hardlimit")
val maxLimit = Int.MaxValue - 1
val DefaultRpcTimeout = config.getInt("hbase.rpc.timeout")
val DefaultMaxAttempt = config.getInt("hbase.client.retries.number")
@@ -106,13 +128,11 @@ class RequestParser(config: Config) {
(labelOrderType.seq, value)
}
}
+
ret
}
- def extractInterval(label: Label, _jsValue: JsValue) = {
- val replaced = TemplateHelper.replaceVariable(System.currentTimeMillis(), _jsValue.toString())
- val jsValue = Json.parse(replaced)
-
+ def extractInterval(label: Label, jsValue: JsValue) = {
def extractKv(js: JsValue) = js match {
case JsObject(map) => map.toSeq
case JsArray(arr) => arr.flatMap {
@@ -135,15 +155,21 @@ class RequestParser(config: Config) {
ret
}
- def extractDuration(label: Label, _jsValue: JsValue) = {
- val replaced = TemplateHelper.replaceVariable(System.currentTimeMillis(), _jsValue.toString())
- val jsValue = Json.parse(replaced)
-
+ def extractDuration(label: Label, jsValue: JsValue) = {
for {
js <- parseOption[JsObject](jsValue, "duration")
} yield {
- val minTs = parseOption[Long](js, "from").getOrElse(Long.MaxValue)
- val maxTs = parseOption[Long](js, "to").getOrElse(Long.MinValue)
+ val minTs = (js \ "from").get match {
+ case JsString(s) => TemplateHelper.replaceVariable(System.currentTimeMillis(), s).toLong
+ case JsNumber(n) => n.toLong
+ case _ => Long.MinValue
+ }
+
+ val maxTs = (js \ "to").get match {
+ case JsString(s) => TemplateHelper.replaceVariable(System.currentTimeMillis(), s).toLong
+ case JsNumber(n) => n.toLong
+ case _ => Long.MaxValue
+ }
if (minTs > maxTs) {
throw new BadQueryException("Duration error. Timestamp of From cannot be larger than To.")
@@ -167,21 +193,24 @@ class RequestParser(config: Config) {
}
ret.map(_.toMap).getOrElse(Map.empty[Byte, InnerValLike])
}
-
+
def extractWhere(label: Label, whereClauseOpt: Option[String]): Try[Where] = {
whereClauseOpt match {
case None => Success(WhereParser.success)
- case Some(_where) =>
- val where = TemplateHelper.replaceVariable(System.currentTimeMillis(), _where)
+ case Some(where) =>
val whereParserKey = s"${label.label}_${where}"
+
parserCache.get(whereParserKey, new Callable[Try[Where]] {
override def call(): Try[Where] = {
- WhereParser(label).parse(where) match {
+ val _where = TemplateHelper.replaceVariable(System.currentTimeMillis(), where)
+
+ WhereParser(label).parse(_where) match {
case s@Success(_) => s
case Failure(ex) => throw BadQueryException(ex.getMessage, ex)
}
}
})
+
}
}
@@ -194,28 +223,38 @@ class RequestParser(config: Config) {
} yield {
Vertex(SourceVertexId(serviceColumn.id.get, innerId), System.currentTimeMillis())
}
- vertices.toSeq
+
+ vertices
}
- def toMultiQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): MultiQuery = {
+ def toMultiQuery(jsValue: JsValue, impIdOpt: Option[String]): MultiQuery = {
val queries = for {
queryJson <- (jsValue \ "queries").asOpt[Seq[JsValue]].getOrElse(Seq.empty)
} yield {
- toQuery(queryJson, isEdgeQuery)
+ toQuery(queryJson, impIdOpt = impIdOpt)
}
val weights = (jsValue \ "weights").asOpt[Seq[Double]].getOrElse(queries.map(_ => 1.0))
- MultiQuery(queries = queries, weights = weights,
- queryOption = toQueryOption(jsValue), jsonQuery = jsValue)
+ MultiQuery(queries = queries, weights = weights, queryOption = toQueryOption(jsValue, impIdOpt))
}
- def toQueryOption(jsValue: JsValue): QueryOption = {
+ def toQueryOption(jsValue: JsValue, impIdOpt: Option[String]): QueryOption = {
val filterOutFields = (jsValue \ "filterOutFields").asOpt[List[String]].getOrElse(List(LabelMeta.to.name))
- val filterOutQuery = (jsValue \ "filterOut").asOpt[JsValue].map { v => toQuery(v) }.map { q =>
+ val filterOutQuery = (jsValue \ "filterOut").asOpt[JsValue].map { v => toQuery(v, impIdOpt = impIdOpt) }.map { q =>
q.copy(queryOption = q.queryOption.copy(filterOutFields = filterOutFields))
}
val removeCycle = (jsValue \ "removeCycle").asOpt[Boolean].getOrElse(true)
val selectColumns = (jsValue \ "select").asOpt[List[String]].getOrElse(List.empty)
- val groupByColumns = (jsValue \ "groupBy").asOpt[List[String]].getOrElse(List.empty)
+// val groupByColumns = (jsValue \ "groupBy").asOpt[List[String]].getOrElse(List.empty)
+ val groupBy = (jsValue \ "groupBy").asOpt[JsValue].getOrElse(JsNull) match {
+ case obj: JsObject =>
+ val keys = (obj \ "key").asOpt[Seq[String]].getOrElse(Nil)
+ val groupByLimit = (obj \ "limit").asOpt[Int].getOrElse(hardLimit)
+ GroupBy(keys, groupByLimit)
+ case arr: JsArray =>
+ val keys = arr.asOpt[Seq[String]].getOrElse(Nil)
+ GroupBy(keys)
+ case _ => GroupBy.Empty
+ }
val orderByColumns: List[(String, Boolean)] = (jsValue \ "orderBy").asOpt[List[JsObject]].map { jsLs =>
for {
js <- jsLs
@@ -238,7 +277,7 @@ class RequestParser(config: Config) {
QueryOption(removeCycle = removeCycle,
selectColumns = selectColumns,
- groupByColumns = groupByColumns,
+ groupBy = groupBy,
orderByColumns = orderByColumns,
filterOutQuery = filterOutQuery,
filterOutFields = filterOutFields,
@@ -247,10 +286,12 @@ class RequestParser(config: Config) {
limitOpt = limitOpt,
returnAgg = returnAgg,
scoreThreshold = scoreThreshold,
- returnDegree = returnDegree
+ returnDegree = returnDegree,
+ impIdOpt = impIdOpt
)
}
- def toQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): Query = {
+
+ def toQuery(jsValue: JsValue, impIdOpt: Option[String]): Query = {
try {
val vertices =
(for {
@@ -274,7 +315,7 @@ class RequestParser(config: Config) {
if (vertices.isEmpty) throw BadQueryException("srcVertices`s id is empty")
val steps = parse[Vector[JsValue]](jsValue, "steps")
- val queryOption = toQueryOption(jsValue)
+ val queryOption = toQueryOption(jsValue, impIdOpt)
val querySteps =
steps.zipWithIndex.map { case (step, stepIdx) =>
@@ -332,7 +373,7 @@ class RequestParser(config: Config) {
}
- val ret = Query(vertices, querySteps, queryOption, jsValue)
+ val ret = Query(vertices, querySteps, queryOption)
// logger.debug(ret.toString)
ret
} catch {
@@ -354,10 +395,8 @@ class RequestParser(config: Config) {
val limit = {
parseOption[Int](labelGroup, "limit") match {
case None => defaultLimit
- case Some(l) if l < 0 => maxLimit
- case Some(l) if l >= 0 =>
- val default = hardLimit
- Math.min(l, default)
+ case Some(l) if l < 0 => hardLimit
+ case Some(l) if l >= 0 => Math.min(l, hardLimit)
}
}
val offset = parseOption[Int](labelGroup, "offset").getOrElse(0)
@@ -405,7 +444,6 @@ class RequestParser(config: Config) {
// FIXME: Order of command matter
QueryParam(labelWithDir)
.sample(sample)
- .limit(offset, limit)
.rank(RankParam(label.id.get, scoring))
.exclude(exclude)
.include(include)
@@ -413,6 +451,7 @@ class RequestParser(config: Config) {
.has(hasFilter)
.labelOrderSeq(indexSeq)
.interval(interval)
+ .limit(offset, limit)
.where(where)
.duplicatePolicy(duplicate)
.includeDegree(includeDegree)
@@ -450,21 +489,8 @@ class RequestParser(config: Config) {
}
}
- def toJsValues(jsValue: JsValue): List[JsValue] = {
- jsValue match {
- case obj: JsObject => List(obj)
- case arr: JsArray => arr.as[List[JsValue]]
- case _ => List.empty[JsValue]
- }
- }
-
- def jsToStr(js: JsValue): String = js match {
- case JsString(s) => s
- case _ => js.toString()
- }
-
def parseBulkFormat(str: String): Seq[(GraphElement, String)] = {
- val edgeStrs = str.split("\\n")
+ val edgeStrs = str.split("\\n").filterNot(_.isEmpty)
val elementsWithTsv = for {
edgeStr <- edgeStrs
str <- GraphUtil.parseString(edgeStr)
@@ -480,24 +506,23 @@ class RequestParser(config: Config) {
}
private def toEdgeWithTsv(jsValue: JsValue, operation: String): Seq[(Edge, String)] = {
- val srcId = (jsValue \ "from").asOpt[JsValue].map(jsToStr)
- val tgtId = (jsValue \ "to").asOpt[JsValue].map(jsToStr)
- val srcIds = (jsValue \ "froms").asOpt[List[JsValue]].toList.flatMap(froms => froms.map(jsToStr)) ++ srcId
- val tgtIds = (jsValue \ "tos").asOpt[List[JsValue]].toList.flatMap(tos => tos.map(jsToStr)) ++ tgtId
+ val srcIds = (jsValue \ "from").asOpt[JsValue].map(Seq(_)).getOrElse(Nil) ++ (jsValue \ "froms").asOpt[Seq[JsValue]].getOrElse(Nil)
+ val tgtIds = (jsValue \ "to").asOpt[JsValue].map(Seq(_)).getOrElse(Nil) ++ (jsValue \ "tos").asOpt[Seq[JsValue]].getOrElse(Nil)
val label = parse[String](jsValue, "label")
val timestamp = parse[Long](jsValue, "timestamp")
- val direction = parseOption[String](jsValue, "direction").getOrElse("")
- val props = (jsValue \ "props").asOpt[JsValue].getOrElse("{}")
+ val direction = parseOption[String](jsValue, "direction").getOrElse("out")
+ val propsJson = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj())
for {
- srcId <- srcIds
- tgtId <- tgtIds
+ srcId <- srcIds.flatMap(jsValueToAny(_).toSeq)
+ tgtId <- tgtIds.flatMap(jsValueToAny(_).toSeq)
} yield {
- val edge = Management.toEdge(timestamp, operation, srcId, tgtId, label, direction, props.toString)
+ // val edge = Management.toEdge(graph, timestamp, operation, srcId, tgtId, label, direction, fromJsonToProperties(propsJson))
+ val edge = Edge.toEdge(srcId, tgtId, label, direction, fromJsonToProperties(propsJson), ts = timestamp, operation = operation)
val tsv = (jsValue \ "direction").asOpt[String] match {
- case None => Seq(timestamp, operation, "e", srcId, tgtId, label, props).mkString("\t")
- case Some(dir) => Seq(timestamp, operation, "e", srcId, tgtId, label, props, dir).mkString("\t")
+ case None => Seq(timestamp, operation, "e", srcId, tgtId, label, propsJson.toString).mkString("\t")
+ case Some(dir) => Seq(timestamp, operation, "e", srcId, tgtId, label, propsJson.toString, dir).mkString("\t")
}
(edge, tsv)
@@ -513,8 +538,8 @@ class RequestParser(config: Config) {
val ts = parseOption[Long](jsValue, "timestamp").getOrElse(System.currentTimeMillis())
val sName = if (serviceName.isEmpty) parse[String](jsValue, "serviceName") else serviceName.get
val cName = if (columnName.isEmpty) parse[String](jsValue, "columnName") else columnName.get
- val props = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj())
- Management.toVertex(ts, operation, id.toString, sName, cName, props.toString)
+ val props = fromJsonToProperties((jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj()))
+ Vertex.toVertex(sName, cName, id.toString, props, ts, operation)
}
def toPropElements(jsObj: JsValue) = Try {
@@ -637,7 +662,7 @@ class RequestParser(config: Config) {
def toDeleteParam(json: JsValue) = {
val labelName = (json \ "label").as[String]
- val labels = Label.findByName(labelName).map { l => Seq(l) }.getOrElse(Nil).filterNot(_.isAsync)
+ val labels = Label.findByName(labelName).map { l => Seq(l) }.getOrElse(Nil)
val direction = (json \ "direction").asOpt[String].getOrElse("out")
val ids = (json \ "ids").asOpt[List[JsValue]].getOrElse(Nil)
@@ -645,4 +670,30 @@ class RequestParser(config: Config) {
val vertices = toVertices(labelName, direction, ids)
(labels, direction, ids, ts, vertices)
}
+
+ def toFetchAndDeleteParam(json: JsValue) = {
+ val labelName = (json \ "label").as[String]
+ val fromOpt = (json \ "from").asOpt[JsValue]
+ val toOpt = (json \ "to").asOpt[JsValue]
+ val direction = (json \ "direction").asOpt[String].getOrElse("out")
+ val indexOpt = (json \ "index").asOpt[String]
+ val propsOpt = (json \ "props").asOpt[JsObject]
+ (labelName, fromOpt, toOpt, direction, indexOpt, propsOpt)
+ }
+
+ def parseExperiment(jsQuery: JsValue): Seq[ExperimentParam] = jsQuery.as[Seq[JsObject]].map { obj =>
+ def _require(field: String) = throw new RuntimeException(s"${field} not found")
+
+ val accessToken = (obj \ "accessToken").asOpt[String].getOrElse(_require("accessToken"))
+ val experimentName = (obj \ "experiment").asOpt[String].getOrElse(_require("experiment"))
+ val uuid = (obj \ "#uuid").get match {
+ case JsString(s) => s
+ case JsNumber(n) => n.toString
+ case _ => _require("#uuid")
+ }
+ val body = (obj \ "params").asOpt[JsObject].getOrElse(Json.obj())
+ val impKeyOpt = (obj \ Experiment.ImpressionKey).asOpt[String]
+
+ (body, accessToken, experimentName, uuid, impKeyOpt)
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
index 55b3e79..4c77ad6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
@@ -21,7 +21,8 @@ package org.apache.s2graph.core.rest
import java.net.URL
-import org.apache.s2graph.core.GraphExceptions.BadQueryException
+import org.apache.s2graph.core.GraphExceptions.{BadQueryException, LabelNotExistException}
+import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core._
import org.apache.s2graph.core.mysqls.{Bucket, Experiment, Service}
import org.apache.s2graph.core.utils.logger
@@ -29,8 +30,21 @@ import play.api.libs.json._
import scala.concurrent.{ExecutionContext, Future}
-
object RestHandler {
+ trait CanLookup[A] {
+ def lookup(m: A, key: String): Option[String]
+ }
+
+ object CanLookup {
+ implicit val oneTupleLookup = new CanLookup[(String, String)] {
+ override def lookup(m: (String, String), key: String) =
+ if (m._1 == key) Option(m._2) else None
+ }
+ implicit val hashMapLookup = new CanLookup[Map[String, String]] {
+ override def lookup(m: Map[String, String], key: String): Option[String] = m.get(key)
+ }
+ }
+
case class HandlerResult(body: Future[JsValue], headers: (String, String)*)
}
@@ -41,25 +55,31 @@ object RestHandler {
class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
import RestHandler._
- val requestParser = new RequestParser(graph.config)
+ val requestParser = new RequestParser(graph)
+
/**
* Public APIS
*/
- def doPost(uri: String, body: String, impKeyOpt: => Option[String] = None): HandlerResult = {
+ def doPost[A](uri: String, body: String, headers: A)(implicit ev: CanLookup[A]): HandlerResult = {
+ val impKeyOpt = ev.lookup(headers, Experiment.ImpressionKey)
+ val impIdOpt = ev.lookup(headers, Experiment.ImpressionId)
+
try {
val jsQuery = Json.parse(body)
uri match {
- case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.toSimpleVertexArrJson))
- case "/graphs/getEdges/grouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithListFormatted))
- case "/graphs/getEdgesExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.toSimpleVertexArrJson))
- case "/graphs/getEdgesExcluded/grouped" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
+// case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery, impIdOpt)(PostProcess.toSimpleVertexArrJson))
+ case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery, impIdOpt)(PostProcess.toJson))
+// case "/graphs/getEdges/grouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithListFormatted))
+// case "/graphs/getEdgesExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.toSimpleVertexArrJson))
+// case "/graphs/getEdgesExcluded/grouped" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
case "/graphs/checkEdges" => checkEdges(jsQuery)
- case "/graphs/getEdgesGrouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithList))
- case "/graphs/getEdgesGroupedExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExclude))
- case "/graphs/getEdgesGroupedExcludedFormatted" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
+// case "/graphs/getEdgesGrouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithList))
+// case "/graphs/getEdgesGroupedExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExclude))
+// case "/graphs/getEdgesGroupedExcludedFormatted" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
case "/graphs/getVertices" => HandlerResult(getVertices(jsQuery))
+ case "/graphs/experiments" => experiments(jsQuery)
case uri if uri.startsWith("/graphs/experiment") =>
val Array(accessToken, experimentName, uuid) = uri.split("/").takeRight(3)
experiment(jsQuery, accessToken, experimentName, uuid, impKeyOpt)
@@ -75,17 +95,8 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
try {
val (quads, isReverted) = requestParser.toCheckEdgeParam(jsValue)
- HandlerResult(graph.checkEdges(quads).map { case queryRequestWithResultLs =>
- val edgeJsons = for {
- queryRequestWithResult <- queryRequestWithResultLs
- (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
- edgeWithScore <- queryResult.edgeWithScoreLs
- (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
- convertedEdge = if (isReverted) edge.duplicateEdge else edge
- edgeJson = PostProcess.edgeToJson(convertedEdge, score, queryRequest.query, queryRequest.queryParam)
- } yield Json.toJson(edgeJson)
-
- Json.toJson(edgeJsons)
+ HandlerResult(graph.checkEdges(quads).map { case stepResult =>
+ PostProcess.toJson(graph, QueryOption(), stepResult)
})
} catch {
case e: Exception => HandlerResult(Future.failed(e))
@@ -93,11 +104,23 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
}
- /**
- * Private APIS
- */
- private def experiment(contentsBody: JsValue, accessToken: String, experimentName: String, uuid: String, impKeyOpt: => Option[String]): HandlerResult = {
+ private def experiments(jsQuery: JsValue): HandlerResult = {
+ val params: Seq[RequestParser.ExperimentParam] = requestParser.parseExperiment(jsQuery)
+
+ val results = params map { case (body, token, experimentName, uuid, impKeyOpt) =>
+ val handlerResult = experiment(body, token, experimentName, uuid, impKeyOpt)
+ val future = handlerResult.body.recover {
+ case e: Exception => PostProcess.emptyResults ++ Json.obj("error" -> Json.obj("reason" -> e.getMessage))
+ }
+
+ future
+ }
+
+ val result = Future.sequence(results).map(JsArray)
+ HandlerResult(body = result)
+ }
+ private def experiment(contentsBody: JsValue, accessToken: String, experimentName: String, uuid: String, impKeyOpt: => Option[String] = None): HandlerResult = {
try {
val bucketOpt = for {
service <- Service.findByAccessToken(accessToken)
@@ -108,7 +131,7 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
val bucket = bucketOpt.getOrElse(throw new RuntimeException("bucket is not found"))
if (bucket.isGraphQuery) {
val ret = buildRequestInner(contentsBody, bucket, uuid)
- HandlerResult(ret.body, Experiment.impressionKey -> bucket.impressionId)
+ HandlerResult(ret.body, Experiment.ImpressionKey -> bucket.impressionId)
}
else throw new RuntimeException("not supported yet")
} catch {
@@ -127,119 +150,54 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
val experimentLog = s"POST $path took -1 ms 200 -1 $body"
logger.debug(experimentLog)
- doPost(path, body)
- }
- }
-
- private def eachQuery(post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue)(q: Query): Future[JsValue] = {
- val filterOutQueryResultsLs = q.filterOutQuery match {
- case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
- case None => Future.successful(Seq.empty)
- }
-
- for {
- queryResultsLs <- graph.getEdges(q)
- filterOutResultsLs <- filterOutQueryResultsLs
- } yield {
- val json = post(queryResultsLs, filterOutResultsLs)
- json
+ doPost(path, body, Experiment.ImpressionId -> bucket.impressionId)
}
}
- def getEdgesAsync(jsonQuery: JsValue)
- (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
-
- val fetch = eachQuery(post) _
+ def getEdgesAsync(jsonQuery: JsValue, impIdOpt: Option[String] = None)
+ (post: (Graph, QueryOption, StepResult) => JsValue): Future[JsValue] = {
jsonQuery match {
- case JsArray(arr) => Future.traverse(arr.map(requestParser.toQuery(_)))(fetch).map(JsArray)
case obj@JsObject(_) =>
(obj \ "queries").asOpt[JsValue] match {
- case None => fetch(requestParser.toQuery(obj))
+ case None =>
+ val query = requestParser.toQuery(obj, impIdOpt)
+ graph.getEdges(query).map(post(graph, query.queryOption, _))
case _ =>
- val multiQuery = requestParser.toMultiQuery(obj)
- val filterOutFuture = multiQuery.queryOption.filterOutQuery match {
- case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
- case None => Future.successful(Seq.empty)
- }
- val futures = multiQuery.queries.zip(multiQuery.weights).map { case (query, weight) =>
- val filterOutQueryResultsLs = query.queryOption.filterOutQuery match {
- case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
- case None => Future.successful(Seq.empty)
- }
- for {
- queryRequestWithResultLs <- graph.getEdges(query)
- filterOutResultsLs <- filterOutQueryResultsLs
- } yield {
- val newQueryRequestWithResult = for {
- queryRequestWithResult <- queryRequestWithResultLs
- queryResult = queryRequestWithResult.queryResult
- } yield {
- val newEdgesWithScores = for {
- edgeWithScore <- queryRequestWithResult.queryResult.edgeWithScoreLs
- } yield {
- edgeWithScore.copy(score = edgeWithScore.score * weight)
- }
- queryRequestWithResult.copy(queryResult = queryResult.copy(edgeWithScoreLs = newEdgesWithScores))
- }
- logger.debug(s"[Size]: ${newQueryRequestWithResult.map(_.queryResult.edgeWithScoreLs.size).sum}")
- (newQueryRequestWithResult, filterOutResultsLs)
- }
- }
- for {
- filterOut <- filterOutFuture
- resultWithExcludeLs <- Future.sequence(futures)
- } yield {
- PostProcess.toSimpleVertexArrJsonMulti(multiQuery.queryOption, resultWithExcludeLs, filterOut)
- // val initial = (ListBuffer.empty[QueryRequestWithResult], ListBuffer.empty[QueryRequestWithResult])
- // val (results, excludes) = resultWithExcludeLs.foldLeft(initial) { case ((prevResults, prevExcludes), (results, excludes)) =>
- // (prevResults ++= results, prevExcludes ++= excludes)
- // }
- // PostProcess.toSimpleVertexArrJson(multiQuery.queryOption, results, excludes ++ filterOut)
- }
+ val multiQuery = requestParser.toMultiQuery(obj, impIdOpt)
+ graph.getEdgesMultiQuery(multiQuery).map(post(graph, multiQuery.queryOption, _))
}
- case _ => throw BadQueryException("Cannot support")
- }
- }
-
- private def getEdgesExcludedAsync(jsonQuery: JsValue)
- (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
- val q = requestParser.toQuery(jsonQuery)
- val filterOutQuery = Query(q.vertices, Vector(q.steps.last))
- val fetchFuture = graph.getEdges(q)
- val excludeFuture = graph.getEdges(filterOutQuery)
+ case JsArray(arr) =>
+ val queries = arr.map(requestParser.toQuery(_, impIdOpt))
+ val weights = queries.map(_ => 1.0)
+ val multiQuery = MultiQuery(queries, weights, QueryOption(), jsonQuery)
+ graph.getEdgesMultiQuery(multiQuery).map(post(graph, multiQuery.queryOption, _))
- for {
- queryResultLs <- fetchFuture
- exclude <- excludeFuture
- } yield {
- post(queryResultLs, exclude)
+ case _ => throw BadQueryException("Cannot support")
}
}
private def getVertices(jsValue: JsValue) = {
val jsonQuery = jsValue
- val ts = System.currentTimeMillis()
- val props = "{}"
val vertices = jsonQuery.as[List[JsValue]].flatMap { js =>
val serviceName = (js \ "serviceName").as[String]
val columnName = (js \ "columnName").as[String]
- for (id <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])) yield {
- Management.toVertex(ts, "insert", id.toString, serviceName, columnName, props)
+ for {
+ idJson <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])
+ id <- jsValueToAny(idJson)
+ } yield {
+ Vertex.toVertex(serviceName, columnName, id)
}
}
graph.getVertices(vertices) map { vertices => PostProcess.verticesToJson(vertices) }
}
+
private def buildRequestBody(requestKeyJsonOpt: Option[JsValue], bucket: Bucket, uuid: String): String = {
var body = bucket.requestBody.replace("#uuid", uuid)
- // // replace variable
- // body = TemplateHelper.replaceVariable(System.currentTimeMillis(), body)
-
- // replace param
for {
requestKeyJson <- requestKeyJsonOpt
jsObj <- requestKeyJson.asOpt[JsObject]
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index eaa25af..a6e81b4 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -19,12 +19,10 @@
package org.apache.s2graph.core.storage
-import java.util.concurrent.{TimeUnit, Executors}
+import java.util.concurrent.{Executors, TimeUnit}
import com.typesafe.config.Config
import org.apache.hadoop.hbase.util.Bytes
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.s2graph.core.ExceptionHandler.{KafkaMessage, Key, Val}
import org.apache.s2graph.core.GraphExceptions.FetchTimeoutException
import org.apache.s2graph.core._
import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
@@ -37,13 +35,15 @@ import org.apache.s2graph.core.utils.{Extensions, logger}
import scala.annotation.tailrec
import scala.collection.Seq
import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{Promise, ExecutionContext, Future}
+import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Random, Try}
-abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
+abstract class Storage[R](val graph: Graph,
+ val config: Config)(implicit ec: ExecutionContext) {
import HBaseType._
/** storage dependent configurations */
+ val DeleteAllFetchCount = config.getInt("delete.all.fetch.count")
val MaxRetryNum = config.getInt("max.retry.number")
val MaxBackOff = config.getInt("max.back.off")
val BackoffTimeout = config.getInt("back.off.timeout")
@@ -57,11 +57,15 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
/** retry scheduler */
val scheduledThreadPool = Executors.newSingleThreadScheduledExecutor()
+
/** handle mutate failed */
val exceptionHandler = new ExceptionHandler(config)
-
val failTopic = s"mutateFailed_${config.getString("phase")}"
+ /** fallback */
+ val fallback = Future.successful(StepResult.Empty)
+ val innerFallback = Future.successful(StepInnerResult.Empty)
+
/**
* Compatibility table
* | label schema version | snapshot edge | index edge | vertex | note |
@@ -229,7 +233,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
* @return
*/
def fetches(queryRequestWithScoreLs: Seq[(QueryRequest, Double)],
- prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[QueryRequestWithResult]]
+ prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepInnerResult]]
/**
* fetch Vertex for given request from storage.
@@ -324,7 +328,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
def mutateEdges(edges: Seq[Edge], withWait: Boolean): Future[Seq[Boolean]] = {
val (strongEdges, weakEdges) =
- edges.partition(e => e.label.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk"))
+ (edges.partition(e => e.label.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk")))
val weakEdgesFutures = weakEdges.groupBy { e => e.label.hbaseZkAddr }.map { case (zkQuorum, edges) =>
val mutations = edges.flatMap { edge =>
@@ -449,7 +453,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
future recoverWith {
case FetchTimeoutException(retryEdge) =>
logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}")
- /* fetch failed. re-fetch should be done */
+ /** fetch failed. re-fetch should be done */
fetchSnapshotEdge(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
}
@@ -465,14 +469,14 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
}
logger.info(s"[Try: $tryNum], [Status: $status] partial fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}")
- /* retry logic */
+ /** retry logic */
val promise = Promise[Boolean]
val backOff = exponentialBackOff(tryNum)
scheduledThreadPool.schedule(new Runnable {
override def run(): Unit = {
val future = if (failedStatusCode == 0) {
// acquire Lock failed. other is mutating so this thead need to re-fetch snapshotEdge.
- /* fetch failed. re-fetch should be done */
+ /** fetch failed. re-fetch should be done */
fetchSnapshotEdge(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
}
@@ -505,7 +509,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
case 0 =>
fetchedSnapshotEdgeOpt match {
case None =>
- /*
+ /**
* no one has never mutated this SN.
* (squashedEdge, edgeMutate) = Edge.buildOperation(None, edges)
* pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = squashedEdge.ts + 1)
@@ -527,7 +531,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
case Some(snapshotEdge) =>
snapshotEdge.pendingEdgeOpt match {
case None =>
- /*
+ /**
* others finished commit on this SN. but there is no contention.
* (squashedEdge, edgeMutate) = Edge.buildOperation(snapshotEdgeOpt, edges)
* pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = snapshotEdge.version + 1) ?
@@ -549,7 +553,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
case Some(pendingEdge) =>
val isLockExpired = pendingEdge.lockTs.get + LockExpireDuration < System.currentTimeMillis()
if (isLockExpired) {
- /*
+ /**
* if pendingEdge.ts == snapshotEdge.ts =>
* (squashedEdge, edgeMutate) = Edge.buildOperation(None, Seq(pendingEdge))
* else =>
@@ -571,7 +575,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
} else {
- /*
+ /**
* others finished commit on this SN and there is currently contention.
* this can't be proceed so retry from re-fetch.
* throw EX
@@ -584,11 +588,11 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
}
case _ =>
- /*
+ /**
* statusCode > 0 which means self locked and there has been partial failure either on mutate, increment, releaseLock
*/
- /*
+ /**
* this succeed to lock this SN. keep doing on commit process.
* if SN.isEmpty =>
* no one never succed to commit on this SN.
@@ -828,90 +832,96 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
/** Delete All */
- protected def deleteAllFetchedEdgesAsyncOld(queryRequest: QueryRequest,
- queryResult: QueryResult,
+ protected def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepInnerResult,
requestTs: Long,
retryNum: Int): Future[Boolean] = {
- val queryParam = queryRequest.queryParam
- val zkQuorum = queryParam.label.hbaseZkAddr
- val futures = for {
- edgeWithScore <- queryResult.edgeWithScoreLs
- (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
- } yield {
- /* reverted direction */
- val reversedIndexedEdgesMutations = edge.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
- indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
- buildIncrementsAsync(indexEdge, -1L)
- }
- val reversedSnapshotEdgeMutations = snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
- val forwardIndexedEdgeMutations = edge.edgesWithIndex.flatMap { indexEdge =>
- indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
- buildIncrementsAsync(indexEdge, -1L)
+ if (stepInnerResult.isEmpty) Future.successful(true)
+ else {
+ val head = stepInnerResult.edgesWithScoreLs.head
+ val zkQuorum = head.edge.label.hbaseZkAddr
+ val futures = for {
+ edgeWithScore <- stepInnerResult.edgesWithScoreLs
+ (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+ } yield {
+ /** reverted direction */
+ val reversedIndexedEdgesMutations = edge.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
+ indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
+ buildIncrementsAsync(indexEdge, -1L)
+ }
+ val reversedSnapshotEdgeMutations = snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
+ val forwardIndexedEdgeMutations = edge.edgesWithIndex.flatMap { indexEdge =>
+ indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
+ buildIncrementsAsync(indexEdge, -1L)
+ }
+ val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
+ writeToStorage(zkQuorum, mutations, withWait = true)
}
- val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
- writeToStorage(zkQuorum, mutations, withWait = true)
- }
- Future.sequence(futures).map { rets => rets.forall(identity) }
+ Future.sequence(futures).map { rets => rets.forall(identity) }
+ }
}
- protected def buildEdgesToDelete(queryRequestWithResultLs: QueryRequestWithResult, requestTs: Long): QueryResult = {
- val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResultLs).get
- val edgeWithScoreLs = queryResult.edgeWithScoreLs.filter { edgeWithScore =>
+ protected def buildEdgesToDelete(stepInnerResult: StepInnerResult, requestTs: Long): StepInnerResult = {
+ val filtered = stepInnerResult.edgesWithScoreLs.filter { edgeWithScore =>
(edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree
- }.map { edgeWithScore =>
- val label = queryRequest.queryParam.label
- val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match {
- case "strong" =>
- val _newPropsWithTs = edgeWithScore.edge.propsWithTs ++
- Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion))
- (GraphUtil.operations("delete"), requestTs, _newPropsWithTs)
- case _ =>
- val oldEdge = edgeWithScore.edge
- (oldEdge.op, oldEdge.version, oldEdge.propsWithTs)
- }
+ }
+ if (filtered.isEmpty) StepInnerResult.Empty
+ else {
+ val head = filtered.head
+ val label = head.edge.label
+ val edgeWithScoreLs = filtered.map { edgeWithScore =>
+ val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match {
+ case "strong" =>
+ val _newPropsWithTs = edgeWithScore.edge.propsWithTs ++
+ Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion))
+ (GraphUtil.operations("delete"), requestTs, _newPropsWithTs)
+ case _ =>
+ val oldEdge = edgeWithScore.edge
+ (oldEdge.op, oldEdge.version, oldEdge.propsWithTs)
+ }
- val copiedEdge =
- edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs)
+ val copiedEdge =
+ edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs)
- val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
-// logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}")
- edgeToDelete
+ val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
+ // logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}")
+ edgeToDelete
+ }
+ //Degree edge?
+ StepInnerResult(edgeWithScoreLs, Nil, false)
}
-
- queryResult.copy(edgeWithScoreLs = edgeWithScoreLs)
}
- protected def deleteAllFetchedEdgesLs(queryRequestWithResultLs: Seq[QueryRequestWithResult], requestTs: Long): Future[(Boolean, Boolean)] = {
- val queryResultLs = queryRequestWithResultLs.map(_.queryResult)
- queryResultLs.foreach { queryResult =>
- if (queryResult.isFailure) throw new RuntimeException("fetched result is fallback.")
+ protected def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepInnerResult],
+ requestTs: Long): Future[(Boolean, Boolean)] = {
+ stepInnerResultLs.foreach { stepInnerResult =>
+ if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.")
}
val futures = for {
- queryRequestWithResult <- queryRequestWithResultLs
- (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get
- deleteQueryResult = buildEdgesToDelete(queryRequestWithResult, requestTs)
- if deleteQueryResult.edgeWithScoreLs.nonEmpty
+ stepInnerResult <- stepInnerResultLs
+ deleteStepInnerResult = buildEdgesToDelete(stepInnerResult, requestTs)
+ if deleteStepInnerResult.edgesWithScoreLs.nonEmpty
} yield {
- val label = queryRequest.queryParam.label
+ val head = deleteStepInnerResult.edgesWithScoreLs.head
+ val label = head.edge.label
label.schemaVersion match {
case HBaseType.VERSION3 | HBaseType.VERSION4 =>
if (label.consistencyLevel == "strong") {
- /*
+ /**
* read: snapshotEdge on queryResult = O(N)
* write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge))
*/
- mutateEdges(deleteQueryResult.edgeWithScoreLs.map(_.edge), withWait = true).map(_.forall(identity))
+ mutateEdges(deleteStepInnerResult.edgesWithScoreLs.map(_.edge), withWait = true).map(_.forall(identity))
} else {
- deleteAllFetchedEdgesAsyncOld(queryRequest, deleteQueryResult, requestTs, MaxRetryNum)
+ deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum)
}
case _ =>
- /*
+ /**
* read: x
* write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices)
*/
- deleteAllFetchedEdgesAsyncOld(queryRequest, deleteQueryResult, requestTs, MaxRetryNum)
+ deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum)
}
}
@@ -923,10 +933,10 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
}
}
- protected def fetchAndDeleteAll(query: Query, requestTs: Long): Future[(Boolean, Boolean)] = {
+ protected def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = {
val future = for {
- queryRequestWithResultLs <- getEdges(query)
- (allDeleted, ret) <- deleteAllFetchedEdgesLs(queryRequestWithResultLs, requestTs)
+ stepInnerResultLs <- Future.sequence(queries.map(getEdgesStepInner(_)))
+ (allDeleted, ret) <- deleteAllFetchedEdgesLs(stepInnerResultLs, requestTs)
} yield {
// logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}")
(allDeleted, ret)
@@ -960,19 +970,19 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
}
val requestTs = ts
- val queryParams = for {
+ /** create query per label */
+ val queries = for {
label <- labels
} yield {
val labelWithDir = LabelWithDirection(label.id.get, dir)
- QueryParam(labelWithDir).limit(0, DeleteAllFetchSize).duplicatePolicy(Option(Query.DuplicatePolicy.Raw))
+ val queryParam = QueryParam(labelWithDir).limit(0, DeleteAllFetchSize).duplicatePolicy(Option(Query.DuplicatePolicy.Raw))
+ val step = Step(List(queryParam))
+ Query(srcVertices, Vector(step))
}
- val step = Step(queryParams.toList)
- val q = Query(srcVertices, Vector(step))
-
// Extensions.retryOnSuccessWithBackoff(MaxRetryNum, Random.nextInt(MaxBackOff) + 1) {
- val retryFuture = Extensions.retryOnSuccess(MaxRetryNum) {
- fetchAndDeleteAll(q, requestTs)
+ val retryFuture = Extensions.retryOnSuccess(DeleteAllFetchCount) {
+ fetchAndDeleteAll(queries, requestTs)
} { case (allDeleted, deleteSuccess) =>
allDeleted && deleteSuccess
}.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess }
@@ -1039,7 +1049,9 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
queryParam: QueryParam,
prevScore: Double = 1.0,
isInnerCall: Boolean,
- parentEdges: Seq[EdgeWithScore]): Seq[EdgeWithScore] = {
+ parentEdges: Seq[EdgeWithScore],
+ startOffset: Int = 0,
+ len: Int = Int.MaxValue): Seq[EdgeWithScore] = {
if (kvs.isEmpty) Seq.empty
else {
val first = kvs.head
@@ -1050,7 +1062,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
else indexEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, None)
for {
- kv <- kvs
+ (kv, idx) <- kvs.zipWithIndex if idx >= startOffset && idx < startOffset + len
edge <-
if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryParam, None, isInnerCall, parentEdges)
else toEdge(kv, queryParam, cacheElementOpt, parentEdges)
@@ -1071,19 +1083,6 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
/** End Of Parse Logic */
-// /** methods for consistency */
-// protected def writeAsyncSimple(zkQuorum: String, elementRpcs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] = {
-// if (elementRpcs.isEmpty) {
-// Future.successful(true)
-// } else {
-// val futures = elementRpcs.map { rpc => writeToStorage(rpc, withWait) }
-// Future.sequence(futures).map(_.forall(identity))
-// }
-// }
-
-
- // def futureCache[T] = Cache[Long, (Long, T)]
-
protected def toRequestEdge(queryRequest: QueryRequest): Edge = {
val srcVertex = queryRequest.vertex
// val tgtVertexOpt = queryRequest.tgtVertexOpt
@@ -1095,7 +1094,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir)
val (srcInnerId, tgtInnerId) = tgtVertexIdOpt match {
case Some(tgtVertexId) => // _to is given.
- /* we use toSnapshotEdge so dont need to swap src, tgt */
+ /** we use toSnapshotEdge so dont need to swap src, tgt */
val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion)
val tgt = InnerVal.convertVersion(tgtVertexId, tgtColumn.columnType, label.schemaVersion)
(src, tgt)
@@ -1135,27 +1134,26 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
}
}
- protected def fetchStep(orgQuery: Query, queryRequestWithResultsLs: Seq[QueryRequestWithResult]): Future[Seq[QueryRequestWithResult]] = {
- if (queryRequestWithResultsLs.isEmpty) Future.successful(Nil)
+ protected def fetchStep(orgQuery: Query,
+ stepIdx: Int,
+ stepInnerResult: StepInnerResult): Future[StepInnerResult] = {
+ if (stepInnerResult.isEmpty) Future.successful(StepInnerResult.Empty)
else {
- val queryRequest = queryRequestWithResultsLs.head.queryRequest
- val q = orgQuery
- val queryResultsLs = queryRequestWithResultsLs.map(_.queryResult)
+ val edgeWithScoreLs = stepInnerResult.edgesWithScoreLs
- val stepIdx = queryRequest.stepIdx + 1
+ val q = orgQuery
val prevStepOpt = if (stepIdx > 0) Option(q.steps(stepIdx - 1)) else None
val prevStepThreshold = prevStepOpt.map(_.nextStepScoreThreshold).getOrElse(QueryParam.DefaultThreshold)
val prevStepLimit = prevStepOpt.map(_.nextStepLimit).getOrElse(-1)
val step = q.steps(stepIdx)
+
val alreadyVisited =
if (stepIdx == 0) Map.empty[(LabelWithDirection, Vertex), Boolean]
- else Graph.alreadyVisitedVertices(queryResultsLs)
+ else Graph.alreadyVisitedVertices(stepInnerResult.edgesWithScoreLs)
- val groupedBy = queryResultsLs.flatMap { queryResult =>
- queryResult.edgeWithScoreLs.map { case edgeWithScore =>
- edgeWithScore.edge.tgtVertex -> edgeWithScore
- }
+ val groupedBy = edgeWithScoreLs.map { case edgeWithScore =>
+ edgeWithScore.edge.tgtVertex -> edgeWithScore
}.groupBy { case (vertex, edgeWithScore) => vertex }
val groupedByFiltered = for {
@@ -1178,39 +1176,48 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
queryParam <- step.queryParams
} yield (QueryRequest(q, stepIdx, vertex, queryParam), prevStepScore)
- Graph.filterEdges(fetches(queryRequests, prevStepTgtVertexIdEdges), alreadyVisited)(ec)
+ val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges)
+ Graph.filterEdges(orgQuery, stepIdx, queryRequests.map(_._1), fetchedLs, orgQuery.steps(stepIdx).queryParams, alreadyVisited)(ec)
}
}
-
- protected def fetchStepFuture(orgQuery: Query, queryRequestWithResultLsFuture: Future[Seq[QueryRequestWithResult]]): Future[Seq[QueryRequestWithResult]] = {
- for {
- queryRequestWithResultLs <- queryRequestWithResultLsFuture
- ret <- fetchStep(orgQuery, queryRequestWithResultLs)
- } yield ret
+ private def getEdgesStepInner(q: Query): Future[StepInnerResult] = {
+ Try {
+ if (q.steps.isEmpty) innerFallback
+ else {
+ // current stepIdx = -1
+ val startStepInnerResult = QueryResult.fromVertices(q)
+ q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) =>
+ for {
+ prevStepInnerResult <- prevStepInnerResultFuture
+ currentStepInnerResult <- fetchStep(q, stepIdx, prevStepInnerResult)
+ } yield currentStepInnerResult
+ }
+ }
+ } recover {
+ case e: Exception =>
+ logger.error(s"getEdgesAsync: $e", e)
+ innerFallback
+ } get
}
-
- def getEdges(q: Query): Future[Seq[QueryRequestWithResult]] = {
- val fallback = {
- val queryRequest = QueryRequest(query = q, stepIdx = 0, q.vertices.head, queryParam = QueryParam.Empty)
- Future.successful(q.vertices.map(v => QueryRequestWithResult(queryRequest, QueryResult())))
- }
+ def getEdges(q: Query): Future[StepResult] = {
Try {
-
if (q.steps.isEmpty) {
// TODO: this should be get vertex query.
fallback
} else {
- // current stepIdx = -1
- val startQueryResultLs = QueryResult.fromVertices(q)
- q.steps.foldLeft(Future.successful(startQueryResultLs)) { case (acc, step) =>
- fetchStepFuture(q, acc)
-// fetchStepFuture(q, acc).map { stepResults =>
-// step.queryParams.zip(stepResults).foreach { case (qParam, queryRequestWithResult) =>
-// val cursor = Base64.getEncoder.encodeToString(queryRequestWithResult.queryResult.tailCursor)
-// qParam.cursorOpt = Option(cursor)
-// }
-// stepResults
-// }
+ val filterOutFuture = q.queryOption.filterOutQuery match {
+ case None => innerFallback
+ case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery)
+ }
+ for {
+ innerResult <- getEdgesStepInner(q)
+ filterOutInnerResult <- filterOutFuture
+ } yield {
+ val result = StepResult(graph, q.queryOption, innerResult)
+ if (filterOutInnerResult.isEmpty) result
+ else {
+ StepResult.filterOut(graph, q.queryOption, result, filterOutInnerResult)
+ }
}
}
} recover {
@@ -1220,7 +1227,34 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
} get
}
- def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[Seq[QueryRequestWithResult]] = {
+ def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = {
+ val fallback = Future.successful(StepResult.Empty)
+
+ Try {
+ if (mq.queries.isEmpty) fallback
+ else {
+ val filterOutFuture = mq.queryOption.filterOutQuery match {
+ case None => innerFallback
+ case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery)
+ }
+
+ val multiQueryFutures = Future.sequence(mq.queries.map { query => getEdges(query) })
+ for {
+ multiQueryResults <- multiQueryFutures
+ filterOutInnerResult <- filterOutFuture
+ } yield {
+ val merged = StepResult.merges(mq.queryOption, multiQueryResults, mq.weights)
+ StepResult.filterOut(graph, mq.queryOption, merged, filterOutInnerResult)
+ }
+ }
+ } recover {
+ case e: Exception =>
+ logger.error(s"getEdgesAsync: $e", e)
+ fallback
+ } get
+ }
+
+ def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[StepResult] = {
val ts = System.currentTimeMillis()
val futures = for {
(srcVertex, tgtVertex, queryParam) <- params
@@ -1228,15 +1262,18 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
edge = Edge(srcVertex, tgtVertex, queryParam.labelWithDir, propsWithTs = propsWithTs)
} yield {
fetchSnapshotEdge(edge).map { case (queryParam, edgeOpt, kvOpt) =>
- val _queryParam = queryParam.tgtVertexInnerIdOpt(Option(edge.tgtVertex.innerId))
- val q = Query.toQuery(Seq(edge.srcVertex), _queryParam)
- val queryRequest = QueryRequest(q, 0, edge.srcVertex, _queryParam)
- val queryResult = QueryResult(edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0)))
- QueryRequestWithResult(queryRequest, queryResult)
+ edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0))
}
}
- Future.sequence(futures)
+ Future.sequence(futures).map { edgeWithScoreLs =>
+ val s2EdgeWithScoreLs = edgeWithScoreLs.flatMap { ls =>
+ ls.map { edgeWithScore =>
+ S2EdgeWithScore(edgeWithScore.edge, edgeWithScore.score)
+ }
+ }
+ StepResult(results = s2EdgeWithScoreLs, grouped = Nil, degreeEdges = Nil)
+ }
}
@@ -1266,6 +1303,13 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
}
}
+
+ protected def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = {
+ val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score }
+ edgeWithScores.map { edgeWithScore =>
+ edgeWithScore.copy(score = edgeWithScore.score / sum)
+ }
+ }
/** end of query */
/** Mutation Builder */
@@ -1290,19 +1334,19 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
(edgeMutate.edgesToDelete.isEmpty, edgeMutate.edgesToInsert.isEmpty) match {
case (true, true) =>
- /* when there is no need to update. shouldUpdate == false */
+ /** when there is no need to update. shouldUpdate == false */
List.empty
case (true, false) =>
- /* no edges to delete but there is new edges to insert so increase degree by 1 */
+ /** no edges to delete but there is new edges to insert so increase degree by 1 */
edgeMutate.edgesToInsert.flatMap { e => buildIncrementsAsync(e) }
case (false, true) =>
- /* no edges to insert but there is old edges to delete so decrease degree by 1 */
+ /** no edges to insert but there is old edges to delete so decrease degree by 1 */
edgeMutate.edgesToDelete.flatMap { e => buildIncrementsAsync(e, -1L) }
case (false, false) =>
- /* update on existing edges so no change on degree */
+ /** update on existing edges so no change on degree */
List.empty
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 138216b..b52ba53 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -82,8 +82,9 @@ object AsynchbaseStorage {
}
-class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionContext)
- extends Storage[Deferred[QueryRequestWithResult]](config) {
+class AsynchbaseStorage(override val graph: Graph,
+ override val config: Config)(implicit ec: ExecutionContext)
+ extends Storage[Deferred[StepInnerResult]](graph, config) {
import Extensions.DeferOps
@@ -100,14 +101,16 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
private val emptyKeyValues = new util.ArrayList[KeyValue]()
private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client
- import CanDefer._
-
/** Future Cache to squash request */
- private val futureCache = new DeferCache[QueryResult, Deferred, Deferred](config, QueryResult(), "FutureCache", useMetric = true)
+ private val futureCache = new DeferCache[StepInnerResult, Deferred, Deferred](config, StepInnerResult.Empty, "FutureCache", useMetric = true)
/** Simple Vertex Cache */
private val vertexCache = new DeferCache[Seq[SKeyValue], Promise, Future](config, Seq.empty[SKeyValue])
+ private val zkQuorum = config.getString("hbase.zookeeper.quorum")
+ private val zkQuorumSlave =
+ if (config.hasPath("hbase.zookeeper.quorum")) Option(config.getString("hbase.zookeeper.quorum"))
+ else None
/**
* fire rpcs into proper hbase cluster using client and
@@ -241,7 +244,7 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: $queryParam")
scanner.setMaxVersions(1)
- scanner.setMaxNumRows(queryParam.limit)
+ scanner.setMaxNumRows(queryParam.offset + queryParam.limit)
scanner.setMaxTimestamp(maxTs)
scanner.setMinTimestamp(minTs)
scanner.setRpcTimeout(queryParam.rpcTimeoutInMillis)
@@ -280,21 +283,38 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
override def fetch(queryRequest: QueryRequest,
prevStepScore: Double,
isInnerCall: Boolean,
- parentEdges: Seq[EdgeWithScore]): Deferred[QueryRequestWithResult] = {
+ parentEdges: Seq[EdgeWithScore]): Deferred[StepInnerResult] = {
+
+ def fetchInner(hbaseRpc: AnyRef): Deferred[StepInnerResult] = {
+ val queryParam = queryRequest.queryParam
- def fetchInner(hbaseRpc: AnyRef): Deferred[QueryResult] = {
fetchKeyValuesInner(hbaseRpc).withCallback { kvs =>
- val edgeWithScores = toEdges(kvs, queryRequest.queryParam, prevStepScore, isInnerCall, parentEdges)
- val resultEdgesWithScores = if (queryRequest.queryParam.sample >= 0) {
- sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
- } else edgeWithScores
- QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty[Byte]))
-// QueryRequestWithResult(queryRequest, QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty)))
+ val (startOffset, length) = queryParam.label.schemaVersion match {
+ case HBaseType.VERSION4 => (queryParam.offset, queryParam.limit)
+ case _ => (0, kvs.length)
+ }
+
+ val edgeWithScores = toEdges(kvs, queryParam, prevStepScore, isInnerCall, parentEdges, startOffset, length)
+ if (edgeWithScores.isEmpty) StepInnerResult.Empty
+ else {
+ val head = edgeWithScores.head
+ val (degreeEdges, indexEdges) =
+ if (head.edge.isDegree) (Seq(head), edgeWithScores.tail)
+ else (Nil, edgeWithScores)
+ val normalized =
+ if (queryRequest.queryParam.shouldNormalize) normalize(indexEdges)
+ else indexEdges
+
+ val sampled = if (queryRequest.queryParam.sample >= 0) {
+ sample(queryRequest, normalized, queryRequest.queryParam.sample)
+ } else normalized
+
+ StepInnerResult(edgesWithScoreLs = sampled, degreeEdges)
+ }
} recoverWith { ex =>
logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex)
- QueryResult(isFailure = true)
-// QueryRequestWithResult(queryRequest, QueryResult(isFailure = true))
+ StepInnerResult.Failure
}
}
@@ -302,27 +322,25 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
val cacheTTL = queryParam.cacheTTLInMillis
val request = buildRequest(queryRequest)
- val defer =
- if (cacheTTL <= 0) fetchInner(request)
- else {
- val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request))
- val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
- futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
+ if (cacheTTL <= 0) fetchInner(request)
+ else {
+ val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request))
+ val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
+ futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
}
- defer withCallback { queryResult => QueryRequestWithResult(queryRequest, queryResult)}
}
override def fetches(queryRequestWithScoreLs: scala.Seq[(QueryRequest, Double)],
- prevStepEdges: Predef.Map[VertexId, scala.Seq[EdgeWithScore]]): Future[scala.Seq[QueryRequestWithResult]] = {
- val defers: Seq[Deferred[QueryRequestWithResult]] = for {
+ prevStepEdges: Predef.Map[VertexId, scala.Seq[EdgeWithScore]]): Future[scala.Seq[StepInnerResult]] = {
+ val defers: Seq[Deferred[StepInnerResult]] = for {
(queryRequest, prevStepScore) <- queryRequestWithScoreLs
parentEdges <- prevStepEdges.get(queryRequest.vertex.id)
} yield fetch(queryRequest, prevStepScore, isInnerCall = false, parentEdges)
- val grouped: Deferred[util.ArrayList[QueryRequestWithResult]] = Deferred.group(defers)
+ val grouped: Deferred[util.ArrayList[StepInnerResult]] = Deferred.group(defers)
grouped withCallback {
- queryResults: util.ArrayList[QueryRequestWithResult] =>
+ queryResults: util.ArrayList[StepInnerResult] =>
queryResults.toIndexedSeq
} toFuture
}
@@ -371,47 +389,56 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
}
- override def createTable(zkAddr: String,
+ override def createTable(_zkAddr: String,
tableName: String,
cfs: List[String],
regionMultiplier: Int,
ttl: Option[Int],
compressionAlgorithm: String): Unit = {
- logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm")
- val admin = getAdmin(zkAddr)
- val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier
- try {
- if (!admin.tableExists(TableName.valueOf(tableName))) {
- try {
- val desc = new HTableDescriptor(TableName.valueOf(tableName))
- desc.setDurability(Durability.ASYNC_WAL)
- for (cf <- cfs) {
- val columnDesc = new HColumnDescriptor(cf)
- .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
- .setBloomFilterType(BloomType.ROW)
- .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
- .setMaxVersions(1)
- .setTimeToLive(2147483647)
- .setMinVersions(0)
- .setBlocksize(32768)
- .setBlockCacheEnabled(true)
- if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
- desc.addFamily(columnDesc)
- }
+ /** TODO: Decide if we will allow each app server to connect to multiple hbase cluster */
+ for {
+ zkAddr <- Seq(zkQuorum) ++ zkQuorumSlave.toSeq
+ } {
+ logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm")
+ val admin = getAdmin(zkAddr)
+ val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier
+ try {
+ if (!admin.tableExists(TableName.valueOf(tableName))) {
+ try {
+ val desc = new HTableDescriptor(TableName.valueOf(tableName))
+ desc.setDurability(Durability.ASYNC_WAL)
+ for (cf <- cfs) {
+ val columnDesc = new HColumnDescriptor(cf)
+ .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
+ .setBloomFilterType(BloomType.ROW)
+ .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
+ .setMaxVersions(1)
+ .setTimeToLive(2147483647)
+ .setMinVersions(0)
+ .setBlocksize(32768)
+ .setBlockCacheEnabled(true)
+ if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
+ desc.addFamily(columnDesc)
+ }
- if (regionCount <= 1) admin.createTable(desc)
- else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount)
- } catch {
- case e: Throwable =>
- logger.error(s"$zkAddr, $tableName failed with $e", e)
- throw e
+ if (regionCount <= 1) admin.createTable(desc)
+ else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount)
+ } catch {
+ case e: Throwable =>
+ logger.error(s"$zkAddr, $tableName failed with $e", e)
+ throw e
+ }
+ } else {
+ logger.info(s"$zkAddr, $tableName, $cfs already exist.")
}
- } else {
- logger.info(s"$zkAddr, $tableName, $cfs already exist.")
+ } catch {
+ case e: Throwable =>
+ logger.error(s"$zkAddr, $tableName failed with $e", e)
+ throw e
+ } finally {
+ admin.close()
+ admin.getConnection.close()
}
- } finally {
- admin.close()
- admin.getConnection.close()
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
index 83d4338..c700e53 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
@@ -63,5 +63,5 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge
else if (indexEdge.op == GraphUtil.operations("incrementCount"))
Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong)
else propsToKeyValues(indexEdge.metas.toSeq)
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
index 4149540..b402c0f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
@@ -51,6 +51,8 @@ object logger {
private val logger = LoggerFactory.getLogger("application")
private val errorLogger = LoggerFactory.getLogger("error")
private val metricLogger = LoggerFactory.getLogger("metrics")
+ private val queryLogger = LoggerFactory.getLogger("query")
+ private val malformedLogger = LoggerFactory.getLogger("malformed")
def metric[T: Loggable](msg: => T) = metricLogger.info(implicitly[Loggable[T]].toLogMessage(msg))
@@ -61,6 +63,10 @@ object logger {
def error[T: Loggable](msg: => T, exception: => Throwable) = errorLogger.error(implicitly[Loggable[T]].toLogMessage(msg), exception)
def error[T: Loggable](msg: => T) = errorLogger.error(implicitly[Loggable[T]].toLogMessage(msg))
+
+ def query[T: Loggable](msg: => T) = queryLogger.info(implicitly[Loggable[T]].toLogMessage(msg))
+
+ def malformed[T: Loggable](msg: => T, exception: => Throwable) = malformedLogger.error(implicitly[Loggable[T]].toLogMessage(msg), exception)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
index a018c01..6933320 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
@@ -19,10 +19,12 @@
package org.apache.s2graph.core
+import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core.mysqls.LabelMeta
import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs, VertexId}
import org.apache.s2graph.core.utils.logger
import org.scalatest.FunSuite
+import play.api.libs.json.{JsObject, Json}
class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
initTests()
@@ -39,7 +41,8 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
val (srcId, tgtId, labelName) = ("1", "2", testLabelName)
val bulkEdge = (for ((ts, op, props) <- bulkQueries) yield {
- Management.toEdge(ts.toLong, op, srcId, tgtId, labelName, "out", props).toLogString
+ val properties = fromJsonToProperties(Json.parse(props).as[JsObject])
+ Edge.toEdge(srcId, tgtId, labelName, "out", properties, ts.toLong, op).toLogString
}).mkString("\n")
val expected = Seq(
[3/3] incubator-s2graph git commit: [S2GRAPH-121]: Create `Result`
class to hold traverse result edges.
Posted by st...@apache.org.
[S2GRAPH-121]: Create `Result` class to hold traverse result edges.
JIRA:
[S2GRAPH-121] https://issues.apache.org/jira/browse/S2GRAPH-121
Pull Request:
Closes #96
Authors
DO YUNG YOON: steamshon@apache.org
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/8dbb9a3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/8dbb9a3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/8dbb9a3e
Branch: refs/heads/master
Commit: 8dbb9a3eeff6f412fe5defee2baffee32f174981
Parents: b590831
Author: DO YUNG YOON <st...@apache.org>
Authored: Wed Nov 16 16:56:41 2016 +0100
Committer: DO YUNG YOON <st...@apache.org>
Committed: Wed Nov 16 17:01:07 2016 +0100
----------------------------------------------------------------------
CHANGES | 2 +
.../org/apache/s2graph/core/mysqls/schema.sql | 1 +
.../scala/org/apache/s2graph/core/Edge.scala | 82 ++-
.../apache/s2graph/core/ExceptionHandler.scala | 19 +-
.../scala/org/apache/s2graph/core/Graph.scala | 232 ++++---
.../apache/s2graph/core/GraphExceptions.scala | 2 +
.../org/apache/s2graph/core/Management.scala | 69 +--
.../org/apache/s2graph/core/OrderingUtil.scala | 7 +
.../org/apache/s2graph/core/PostProcess.scala | 601 ++++---------------
.../org/apache/s2graph/core/QueryParam.scala | 75 ++-
.../org/apache/s2graph/core/QueryResult.scala | 234 +++++++-
.../scala/org/apache/s2graph/core/Vertex.scala | 74 ++-
.../apache/s2graph/core/mysqls/Experiment.scala | 3 +-
.../org/apache/s2graph/core/mysqls/Label.scala | 59 +-
.../s2graph/core/mysqls/ServiceColumn.scala | 31 +-
.../s2graph/core/parsers/WhereParser.scala | 2 +-
.../s2graph/core/rest/RequestParser.scala | 177 ++++--
.../apache/s2graph/core/rest/RestHandler.scala | 182 +++---
.../apache/s2graph/core/storage/Storage.scala | 336 ++++++-----
.../core/storage/hbase/AsynchbaseStorage.scala | 147 +++--
.../indexedge/wide/IndexEdgeSerializable.scala | 2 +-
.../org/apache/s2graph/core/utils/Logger.scala | 6 +
.../org/apache/s2graph/core/EdgeTest.scala | 5 +-
.../s2graph/core/Integrate/CrudTest.scala | 163 +++--
.../core/Integrate/IntegrateCommon.scala | 10 +-
.../s2graph/core/Integrate/QueryTest.scala | 159 ++---
.../core/Integrate/StrongLabelDeleteTest.scala | 14 +-
.../core/Integrate/VertexTestHelper.scala | 7 +-
.../core/Integrate/WeakLabelDeleteTest.scala | 9 +-
.../apache/s2graph/core/JsonParserTest.scala | 3 +-
.../apache/s2graph/core/QueryParamTest.scala | 56 +-
.../s2graph/core/TestCommonWithModels.scala | 2 +-
.../core/benchmark/JsonBenchmarkSpec.scala | 62 +-
.../core/benchmark/SamplingBenchmarkSpec.scala | 207 +++----
.../org/apache/s2graph/rest/netty/Server.scala | 96 ++-
.../apache/s2graph/rest/play/Bootstrap.scala | 4 +-
.../s2graph/rest/play/config/Config.scala | 4 +
.../controllers/ApplicationController.scala | 6 +-
.../play/controllers/CounterController.scala | 21 +-
.../rest/play/controllers/EdgeController.scala | 164 +++--
.../play/controllers/ExperimentController.scala | 5 +-
.../play/controllers/PublishController.scala | 8 -
.../rest/play/controllers/QueryController.scala | 21 +-
.../play/controllers/VertexController.scala | 18 +-
s2rest_play/conf/reference.conf | 1 +
s2rest_play/conf/routes | 7 +-
46 files changed, 1892 insertions(+), 1503 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 0c8edc3..73b3597 100644
--- a/CHANGES
+++ b/CHANGES
@@ -94,6 +94,8 @@ Release 0.1.0 - unreleased
S2GRAPH-127: Refactor ExceptionHander Object into Class (Committed by DOYUNG YOON).
+ S2GRAPH-121: Create `Result` class to hold traverse result edges (Committed by DOYUNG YOON).
+
BUG FIXES
S2GRAPH-18: Query Option "interval" is Broken.
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
----------------------------------------------------------------------
diff --git a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
index d8ee5bc..822df6c 100644
--- a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
+++ b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
@@ -96,6 +96,7 @@ CREATE TABLE `labels` (
`is_async` tinyint(4) NOT NULL default '0',
`compressionAlgorithm` varchar(64) NOT NULL DEFAULT 'lz4',
`options` text,
+ `deleted_at` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_label` (`label`),
INDEX `idx_labels_src_column_name` (`src_column_name`),
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
index 4d4afe0..97730f3 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
@@ -19,10 +19,11 @@
package org.apache.s2graph.core
+import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
+import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.utils.logger
-import org.apache.s2graph.core.JSONParser._
import play.api.libs.json.{JsNumber, Json}
import scala.collection.JavaConversions._
import scala.util.hashing.MurmurHash3
@@ -112,7 +113,7 @@ case class IndexEdge(srcVertex: Vertex,
lazy val ordersKeyMap = orders.map { case (byte, _) => byte }.toSet
lazy val metas = for ((k, v) <- props if !ordersKeyMap.contains(k)) yield k -> v.innerVal
- // lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) }
+// lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) }
//TODO:
// lazy val kvs = Graph.client.indexedEdgeSerializer(this).toKeyValues.toList
@@ -151,8 +152,23 @@ case class Edge(srcVertex: Vertex,
val schemaVer = label.schemaVersion
val ts = propsWithTs(LabelMeta.timeStampSeq).innerVal.toString.toLong
+ lazy val srcId = srcVertex.innerIdVal
+ lazy val tgtId = tgtVertex.innerIdVal
+ lazy val labelName = label.label
+ lazy val direction = GraphUtil.fromDirection(labelWithDir.dir)
+ lazy val properties = toProps()
+
def props = propsWithTs.mapValues(_.innerVal)
+
+ private def toProps(): Map[String, Any] = {
+ for {
+ (labelMeta, defaultVal) <- label.metaPropsDefaultMapInner
+ } yield {
+ labelMeta.name -> propsWithTs.getOrElse(labelMeta.seq, defaultVal).innerVal.value
+ }
+ }
+
def relatedEdges = {
if (labelWithDir.isDirected) {
val skipReverse = label.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false)
@@ -204,10 +220,10 @@ case class Edge(srcVertex: Vertex,
def isDegree = propsWithTs.contains(LabelMeta.degreeSeq)
- // def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match {
- // case Some(_) => props
- // case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer))
- // }
+// def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match {
+// case Some(_) => props
+// case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer))
+// }
def propsPlusTsValid = propsWithTs.filter(kv => kv._1 >= 0)
@@ -290,8 +306,31 @@ case class Edge(srcVertex: Vertex,
ret.mkString("\t")
}
+
+ def selectValues(selectColumns: Seq[String],
+ useToString: Boolean = true,
+ score: Double = 0.0): Seq[Option[Any]] = {
+ //TODO: Option should be matched in JsonParser anyTo*
+ for {
+ selectColumn <- selectColumns
+ } yield {
+ val valueOpt = selectColumn match {
+ case LabelMeta.from.name | "from" => Option(srcId)
+ case LabelMeta.to.name | "to" => Option(tgtId)
+ case "label" => Option(labelName)
+ case "direction" => Option(direction)
+ case "score" => Option(score)
+ case LabelMeta.timestamp.name | "timestamp" => Option(ts)
+ case _ =>
+ properties.get(selectColumn)
+ }
+ if (useToString) valueOpt.map(_.toString)
+ else valueOpt
+ }
+ }
}
+
case class EdgeMutate(edgesToDelete: List[IndexEdge] = List.empty[IndexEdge],
edgesToInsert: List[IndexEdge] = List.empty[IndexEdge],
newSnapshotEdge: Option[SnapshotEdge] = None) {
@@ -310,6 +349,33 @@ object Edge {
val incrementVersion = 1L
val minTsVal = 0L
+ def toEdge(srcId: Any,
+ tgtId: Any,
+ labelName: String,
+ direction: String,
+ props: Map[String, Any] = Map.empty,
+ ts: Long = System.currentTimeMillis(),
+ operation: String = "insert"): Edge = {
+ val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
+
+ val srcVertexId = toInnerVal(srcId.toString, label.srcColumn.columnType, label.schemaVersion)
+ val tgtVertexId = toInnerVal(tgtId.toString, label.tgtColumn.columnType, label.schemaVersion)
+
+ val srcColId = label.srcColumn.id.get
+ val tgtColId = label.tgtColumn.id.get
+
+ val srcVertex = Vertex(SourceVertexId(srcColId, srcVertexId), System.currentTimeMillis())
+ val tgtVertex = Vertex(TargetVertexId(tgtColId, tgtVertexId), System.currentTimeMillis())
+ val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
+
+ val labelWithDir = LabelWithDirection(label.id.get, dir)
+ val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts)
+ val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
+ val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
+
+ new Edge(srcVertex, tgtVertex, labelWithDir, op = op, version = ts, propsWithTs = propsWithTs)
+ }
+
/** now version information is required also **/
type State = Map[Byte, InnerValLikeWithTs]
type PropsPairWithTs = (State, State, Long, String)
@@ -387,7 +453,7 @@ object Edge {
// logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}")
// logger.error(s"$propsWithTs")
- (requestEdge, edgeMutate)
+ (requestEdge.copy(propsWithTs = propsWithTs), edgeMutate)
}
}
@@ -582,7 +648,7 @@ object Edge {
(propsWithTs, true)
}
- def fromString(s: String): Option[Edge] = Graph.toEdge(s)
+// def fromString(s: String): Option[Edge] = Graph.toEdge(s)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
index d03d483..48976d3 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
@@ -24,8 +24,11 @@ import java.util.Properties
import com.typesafe.config.Config
import org.apache.kafka.clients.producer._
import org.apache.s2graph.core.utils.logger
+import play.api.libs.json.JsValue
class ExceptionHandler(config: Config) {
+
+
import ExceptionHandler._
val keyBrokerList = "kafka.metadata.broker.list"
@@ -43,7 +46,6 @@ class ExceptionHandler(config: Config) {
}
} else None
-
def enqueue(m: KafkaMessage): Unit = {
producer match {
case None => logger.debug(s"skip log to Kafka: ${m}")
@@ -68,24 +70,33 @@ object ExceptionHandler {
type Key = String
type Val = String
- def toKafkaMessage(topic: String, element: GraphElement, originalString: Option[String] = None) = {
+ def toKafkaMessage(topic: String,
+ element: GraphElement,
+ originalString: Option[String] = None,
+ produceJson: Boolean = false) = {
+ val edgeString = originalString.getOrElse(element.toLogString())
+ val msg = edgeString
+
KafkaMessage(
new ProducerRecord[Key, Val](
topic,
element.queuePartitionKey,
- originalString.getOrElse(element.toLogString())))
+ msg))
}
+ // only used in deleteAll
def toKafkaMessage(topic: String, tsv: String) = {
KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv))
}
+ def toKafkaMessage(topic: String, jsValue: JsValue): KafkaMessage = toKafkaMessage(topic, jsValue.toString())
+
case class KafkaMessage(msg: ProducerRecord[Key, Val])
private def toKafkaProp(config: Config) = {
val props = new Properties()
- /* all default configuration for new producer */
+ /** all default configuration for new producer */
val brokers =
if (config.hasPath("kafka.metadata.broker.list")) config.getString("kafka.metadata.broker.list")
else "localhost"
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
index c25b71a..2cbddea 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
@@ -23,11 +23,13 @@ import java.util
import java.util.concurrent.ConcurrentHashMap
import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core.mysqls.{Label, Model}
import org.apache.s2graph.core.parsers.WhereParser
import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
import org.apache.s2graph.core.types.{InnerVal, LabelWithDirection}
import org.apache.s2graph.core.utils.logger
+import play.api.libs.json.{JsObject, Json}
import scala.collection.JavaConversions._
import scala.collection._
@@ -58,6 +60,7 @@ object Graph {
"back.off.timeout" -> java.lang.Integer.valueOf(1000),
"hbase.fail.prob" -> java.lang.Double.valueOf(-0.1),
"delete.all.fetch.size" -> java.lang.Integer.valueOf(1000),
+ "delete.all.fetch.count" -> java.lang.Integer.valueOf(200),
"future.cache.max.size" -> java.lang.Integer.valueOf(100000),
"future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
"future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
@@ -84,10 +87,9 @@ object Graph {
(hashKey, filterHashKey)
}
- def alreadyVisitedVertices(queryResultLs: Seq[QueryResult]): Map[(LabelWithDirection, Vertex), Boolean] = {
+ def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): Map[(LabelWithDirection, Vertex), Boolean] = {
val vertices = for {
- queryResult <- queryResultLs
- edgeWithScore <- queryResult.edgeWithScoreLs
+ edgeWithScore <- edgeWithScoreLs
edge = edgeWithScore.edge
vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex
} yield (edge.labelWithDir, vertex) -> true
@@ -132,81 +134,36 @@ object Graph {
tsVal
}
- def aggregateScore(newScore: Double,
- resultEdges: ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)],
- duplicateEdges: ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]],
- edgeWithScoreSorted: ListBuffer[(HashKey, FilterHashKey, Edge, Double)],
- hashKey: HashKey,
- filterHashKey: FilterHashKey,
- queryParam: QueryParam,
- convertedEdge: Edge) = {
-
- /* skip duplicate policy check if consistencyLevel is strong */
- if (queryParam.label.consistencyLevel != "strong" && resultEdges.containsKey(hashKey)) {
- val (oldFilterHashKey, oldEdge, oldScore) = resultEdges.get(hashKey)
+ def processDuplicates(queryParam: QueryParam,
+ duplicates: Seq[(FilterHashKey, EdgeWithScore)]): Seq[(FilterHashKey, EdgeWithScore)] = {
+
+ if (queryParam.label.consistencyLevel != "strong") {
//TODO:
queryParam.duplicatePolicy match {
- case Query.DuplicatePolicy.First => // do nothing
- case Query.DuplicatePolicy.Raw =>
- if (duplicateEdges.containsKey(hashKey)) {
- duplicateEdges.get(hashKey).append(convertedEdge -> newScore)
- } else {
- val newBuffer = new ListBuffer[(Edge, Double)]
- newBuffer.append(convertedEdge -> newScore)
- duplicateEdges.put(hashKey, newBuffer)
- }
+ case Query.DuplicatePolicy.First => Seq(duplicates.head)
+ case Query.DuplicatePolicy.Raw => duplicates
case Query.DuplicatePolicy.CountSum =>
- resultEdges.put(hashKey, (filterHashKey, oldEdge, oldScore + 1))
+ val countSum = duplicates.size
+ Seq(duplicates.head._1 -> duplicates.head._2.copy(score = countSum))
case _ =>
- resultEdges.put(hashKey, (filterHashKey, oldEdge, oldScore + newScore))
+ val scoreSum = duplicates.foldLeft(0.0) { case (prev, current) => prev + current._2.score }
+ Seq(duplicates.head._1 -> duplicates.head._2.copy(score = scoreSum))
}
} else {
- resultEdges.put(hashKey, (filterHashKey, convertedEdge, newScore))
- edgeWithScoreSorted.append((hashKey, filterHashKey, convertedEdge, newScore))
+ duplicates
}
}
-
- def aggregateResults(queryRequestWithResult: QueryRequestWithResult,
- queryParamResult: Result,
- edgesToInclude: util.HashSet[FilterHashKey],
- edgesToExclude: util.HashSet[FilterHashKey]): QueryRequestWithResult = {
- val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
- val (query, stepIdx, _, queryParam) = QueryRequest.unapply(queryRequest).get
-
- val (duplicateEdges, resultEdges, edgeWithScoreSorted) = queryParamResult
- val edgesWithScores = for {
- (hashKey, filterHashKey, edge, _) <- edgeWithScoreSorted if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
- score = resultEdges.get(hashKey)._3
- (duplicateEdge, aggregatedScore) <- fetchDuplicatedEdges(edge, score, hashKey, duplicateEdges) if aggregatedScore >= queryParam.threshold
- } yield EdgeWithScore(duplicateEdge, aggregatedScore)
-
- QueryRequestWithResult(queryRequest, QueryResult(edgesWithScores))
- }
-
- def fetchDuplicatedEdges(edge: Edge,
- score: Double,
- hashKey: HashKey,
- duplicateEdges: ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]]) = {
- (edge -> score) +: (if (duplicateEdges.containsKey(hashKey)) duplicateEdges.get(hashKey) else Seq.empty)
- }
-
- def queryResultWithFilter(queryRequestWithResult: QueryRequestWithResult) = {
- val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
- val (_, _, _, queryParam) = QueryRequest.unapply(queryRequest).get
- val whereFilter = queryParam.where.get
- if (whereFilter == WhereParser.success) queryResult.edgeWithScoreLs
- else queryResult.edgeWithScoreLs.withFilter(edgeWithScore => whereFilter.filter(edgeWithScore.edge))
- }
-
- def filterEdges(queryResultLsFuture: Future[Seq[QueryRequestWithResult]],
+ def filterEdges(q: Query,
+ stepIdx: Int,
+ queryRequests: Seq[QueryRequest],
+ queryResultLsFuture: Future[Seq[StepInnerResult]],
+ queryParams: Seq[QueryParam],
alreadyVisited: Map[(LabelWithDirection, Vertex), Boolean] = Map.empty[(LabelWithDirection, Vertex), Boolean])
- (implicit ec: scala.concurrent.ExecutionContext): Future[Seq[QueryRequestWithResult]] = {
+ (implicit ec: scala.concurrent.ExecutionContext): Future[StepInnerResult] = {
queryResultLsFuture.map { queryRequestWithResultLs =>
- if (queryRequestWithResultLs.isEmpty) Nil
+ if (queryRequestWithResultLs.isEmpty) StepInnerResult.Empty
else {
- val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResultLs.head).get
- val (q, stepIdx, srcVertex, queryParam) = QueryRequest.unapply(queryRequest).get
val step = q.steps(stepIdx)
val nextStepOpt = if (stepIdx < q.steps.size - 1) Option(q.steps(stepIdx + 1)) else None
@@ -219,73 +176,114 @@ object Graph {
val edgesToExclude = new util.HashSet[FilterHashKey]()
val edgesToInclude = new util.HashSet[FilterHashKey]()
- val queryParamResultLs = new ListBuffer[Result]
- queryRequestWithResultLs.foreach { queryRequestWithResult =>
- val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
+ val sequentialLs = new ListBuffer[(HashKey, FilterHashKey, EdgeWithScore)]()
+ val agg = new mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, EdgeWithScore)]]()
+ val params = new mutable.HashMap[HashKey, QueryParam]()
+ var numOfDuplicates = 0
+ var numOfTotal = 0
+ queryRequests.zip(queryRequestWithResultLs).foreach { case (queryRequest, stepInnerResult) =>
val queryParam = queryRequest.queryParam
- val duplicateEdges = new util.concurrent.ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]]()
- val resultEdges = new util.concurrent.ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)]()
- val edgeWithScoreSorted = new ListBuffer[(HashKey, FilterHashKey, Edge, Double)]
val labelWeight = step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0)
-
- // store degree value with Array.empty so if degree edge exist, it comes at very first.
- def checkDegree() = queryResult.edgeWithScoreLs.headOption.exists { edgeWithScore =>
- edgeWithScore.edge.isDegree
- }
- var isDegree = checkDegree()
-
val includeExcludeKey = queryParam.labelWithDir.labelId -> queryParam.labelWithDir.dir
val shouldBeExcluded = excludeLabelWithDirSet.contains(includeExcludeKey)
val shouldBeIncluded = includeLabelWithDirSet.contains(includeExcludeKey)
-
- queryResultWithFilter(queryRequestWithResult).foreach { edgeWithScore =>
- val (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+ val where = queryParam.where.get
+
+ for {
+ edgeWithScore <- stepInnerResult.edgesWithScoreLs
+ if where == WhereParser.success || where.filter(edgeWithScore.edge)
+ (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+ } {
+ numOfTotal += 1
if (queryParam.transformer.isDefault) {
val convertedEdge = edge
- val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree)
+ val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree = false)
- /* check if this edge should be exlcuded. */
- if (shouldBeExcluded && !isDegree) {
+ /** check if this edge should be exlcuded. */
+ if (shouldBeExcluded) {
edgesToExclude.add(filterHashKey)
} else {
- if (shouldBeIncluded && !isDegree) {
+ if (shouldBeIncluded) {
edgesToInclude.add(filterHashKey)
}
val tsVal = processTimeDecay(queryParam, convertedEdge)
val newScore = labelWeight * score * tsVal
- aggregateScore(newScore, resultEdges, duplicateEdges, edgeWithScoreSorted, hashKey, filterHashKey, queryParam, convertedEdge)
+ val newEdgeWithScore = EdgeWithScore(convertedEdge, newScore)
+ sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore))
+ agg.get(hashKey) match {
+ case None =>
+ val newLs = new ListBuffer[(FilterHashKey, EdgeWithScore)]()
+ newLs += (filterHashKey -> newEdgeWithScore)
+ agg += (hashKey -> newLs)
+ case Some(old) =>
+ numOfDuplicates += 1
+ old += (filterHashKey -> newEdgeWithScore)
+ }
+ params += (hashKey -> queryParam)
}
} else {
convertEdges(queryParam, edge, nextStepOpt).foreach { convertedEdge =>
- val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree)
+ val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree = false)
- /* check if this edge should be exlcuded. */
- if (shouldBeExcluded && !isDegree) {
+ /** check if this edge should be exlcuded. */
+ if (shouldBeExcluded) {
edgesToExclude.add(filterHashKey)
} else {
- if (shouldBeIncluded && !isDegree) {
+ if (shouldBeIncluded) {
edgesToInclude.add(filterHashKey)
}
val tsVal = processTimeDecay(queryParam, convertedEdge)
val newScore = labelWeight * score * tsVal
- aggregateScore(newScore, resultEdges, duplicateEdges, edgeWithScoreSorted, hashKey, filterHashKey, queryParam, convertedEdge)
+ val newEdgeWithScore = EdgeWithScore(convertedEdge, newScore)
+ sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore))
+ agg.get(hashKey) match {
+ case None =>
+ val newLs = new ListBuffer[(FilterHashKey, EdgeWithScore)]()
+ newLs += (filterHashKey -> newEdgeWithScore)
+ agg += (hashKey -> newLs)
+ case Some(old) =>
+ numOfDuplicates += 1
+ old += (filterHashKey -> newEdgeWithScore)
+ }
+ params += (hashKey -> queryParam)
}
}
}
- isDegree = false
}
- val ret = (duplicateEdges, resultEdges, edgeWithScoreSorted)
- queryParamResultLs.append(ret)
}
- val aggregatedResults = for {
- (queryRequestWithResult, queryParamResult) <- queryRequestWithResultLs.zip(queryParamResultLs)
- } yield {
- aggregateResults(queryRequestWithResult, queryParamResult, edgesToInclude, edgesToExclude)
+
+ val edgeWithScoreLs = new ListBuffer[EdgeWithScore]()
+ if (numOfDuplicates == 0) {
+ // no duplicates at all.
+ for {
+ (hashKey, filterHashKey, edgeWithScore) <- sequentialLs
+ if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
+ } {
+ edgeWithScoreLs += edgeWithScore
}
+ } else {
+ // need to resolve duplicates.
+ val seen = new mutable.HashSet[HashKey]()
+ for {
+ (hashKey, filterHashKey, edgeWithScore) <- sequentialLs
+ if !seen.contains(hashKey)
+ if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
+ } {
+ val queryParam = params(hashKey)
+ val duplicates = processDuplicates(queryParam, agg(hashKey))
+ duplicates.foreach { case (_, duplicate) =>
+ if (duplicate.score >= queryParam.threshold) {
+ seen += hashKey
+ edgeWithScoreLs += duplicate
+ }
+ }
+ }
+ }
- aggregatedResults
+ val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges)
+ StepInnerResult(edgesWithScoreLs = edgeWithScoreLs, degreeEdges = degrees)
}
}
}
@@ -310,7 +308,7 @@ object Graph {
element
} recover {
case e: Exception =>
- logger.error(s"$e", e)
+ logger.error(s"[toElement]: $e", e)
None
} get
@@ -323,37 +321,33 @@ object Graph {
toEdge(GraphUtil.split(s))
}
- //"1418342849000\tu\te\t3286249\t71770\ttalk_friend\t{\"is_hidden\":false}"
- //{"from":1,"to":101,"label":"graph_test","props":{"time":-1, "weight":10},"timestamp":1417616431},
def toEdge(parts: Array[String]): Option[Edge] = Try {
val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
- val props = if (parts.length >= 7) parts(6) else "{}"
+ val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
val tempDirection = if (parts.length >= 8) parts(7) else "out"
val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection
-
- val edge = Management.toEdge(ts.toLong, operation, srcId, tgtId, label, direction, props)
- // logger.debug(s"toEdge: $edge")
- Some(edge)
+ val edge = Edge.toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation)
+ Option(edge)
} recover {
case e: Exception =>
- logger.error(s"toEdge: $e", e)
+ logger.error(s"[toEdge]: $e", e)
throw e
} get
- //"1418342850000\ti\tv\t168756793\ttalk_user_id\t{\"country_iso\":\"KR\"}"
def toVertex(parts: Array[String]): Option[Vertex] = Try {
val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
- val props = if (parts.length >= 7) parts(6) else "{}"
- Some(Management.toVertex(ts.toLong, operation, srcId, serviceName, colName, props))
+ val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
+ val vertex = Vertex.toVertex(serviceName, colName, srcId, props, ts.toLong, operation)
+ Option(vertex)
} recover {
case e: Throwable =>
- logger.error(s"toVertex: $e", e)
+ logger.error(s"[toVertex]: $e", e)
throw e
} get
- def initStorage(config: Config)(ec: ExecutionContext) = {
+ def initStorage(graph: Graph, config: Config)(ec: ExecutionContext) = {
config.getString("s2graph.storage.backend") match {
- case "hbase" => new AsynchbaseStorage(config)(ec)
+ case "hbase" => new AsynchbaseStorage(graph, config)(ec)
case _ => throw new RuntimeException("not supported storage.")
}
}
@@ -366,7 +360,7 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) {
Model.loadCache()
// TODO: Make storage client by config param
- val storage = Graph.initStorage(config)(ec)
+ val storage = Graph.initStorage(this, config)(ec)
for {
@@ -375,9 +369,11 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) {
} logger.info(s"[Initialized]: $k, ${this.config.getAnyRef(k)}")
/** select */
- def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[Seq[QueryRequestWithResult]] = storage.checkEdges(params)
+ def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[StepResult] = storage.checkEdges(params)
+
+ def getEdges(q: Query): Future[StepResult] = storage.getEdges(q)
- def getEdges(q: Query): Future[Seq[QueryRequestWithResult]] = storage.getEdges(q)
+ def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = storage.getEdgesMultiQuery(mq)
def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = storage.getVertices(vertices)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
index 0898ffa..2f090cf 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
@@ -31,6 +31,8 @@ object GraphExceptions {
case class LabelAlreadyExistException(msg: String) extends Exception(msg)
+ case class LabelNameTooLongException(msg: String) extends Exception(msg)
+
case class InternalException(msg: String) extends Exception(msg)
case class IllegalDataTypeException(msg: String) extends Exception(msg)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index 39020c7..9ef7c14 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -19,7 +19,7 @@
package org.apache.s2graph.core
-import org.apache.s2graph.core.GraphExceptions.{InvalidHTableException, LabelAlreadyExistException, LabelNotExistException}
+import org.apache.s2graph.core.GraphExceptions.{LabelNameTooLongException, InvalidHTableException, LabelAlreadyExistException, LabelNotExistException}
import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
import org.apache.s2graph.core.mysqls._
import org.apache.s2graph.core.types.HBaseType._
@@ -48,9 +48,9 @@ object Management {
import HBaseType._
+ val LABEL_NAME_MAX_LENGTH = 100
val DefaultCompressionAlgorithm = "gz"
-
def findService(serviceName: String) = {
Service.findByName(serviceName, useCache = false)
}
@@ -190,62 +190,6 @@ object Management {
}
}
- def toEdge(ts: Long, operation: String, srcId: String, tgtId: String,
- labelStr: String, direction: String = "", props: String): Edge = {
-
- val label = tryOption(labelStr, getServiceLabel)
- val dir =
- if (direction == "")
-// GraphUtil.toDirection(label.direction)
- GraphUtil.directions("out")
- else
- GraphUtil.toDirection(direction)
-
- // logger.debug(s"$srcId, ${label.srcColumnWithDir(dir)}")
- // logger.debug(s"$tgtId, ${label.tgtColumnWithDir(dir)}")
-
- val srcVertexId = toInnerVal(srcId, label.srcColumn.columnType, label.schemaVersion)
- val tgtVertexId = toInnerVal(tgtId, label.tgtColumn.columnType, label.schemaVersion)
-
- val srcColId = label.srcColumn.id.get
- val tgtColId = label.tgtColumn.id.get
- val (srcVertex, tgtVertex) = if (dir == GraphUtil.directions("out")) {
- (Vertex(SourceVertexId(srcColId, srcVertexId), System.currentTimeMillis()),
- Vertex(TargetVertexId(tgtColId, tgtVertexId), System.currentTimeMillis()))
- } else {
- (Vertex(SourceVertexId(tgtColId, tgtVertexId), System.currentTimeMillis()),
- Vertex(TargetVertexId(srcColId, srcVertexId), System.currentTimeMillis()))
- }
-
- // val dir = if (direction == "") GraphUtil.toDirection(label.direction) else GraphUtil.toDirection(direction)
- val labelWithDir = LabelWithDirection(label.id.get, dir)
- val op = tryOption(operation, GraphUtil.toOp)
-
- val jsObject = Json.parse(props).asOpt[JsObject].getOrElse(Json.obj())
- val parsedProps = toProps(label, jsObject.fields).toMap
- val propsWithTs = parsedProps.map(kv => (kv._1 -> InnerValLikeWithTs(kv._2, ts))) ++
- Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(ts, label.schemaVersion), ts))
-
- Edge(srcVertex, tgtVertex, labelWithDir, op, version = ts, propsWithTs = propsWithTs)
-
- }
-
- def toVertex(ts: Long, operation: String, id: String, serviceName: String, columnName: String, props: String): Vertex = {
- Service.findByName(serviceName) match {
- case None => throw new RuntimeException(s"$serviceName does not exist. create service first.")
- case Some(service) =>
- ServiceColumn.find(service.id.get, columnName) match {
- case None => throw new RuntimeException(s"$columnName is not exist. create service column first.")
- case Some(col) =>
- val idVal = toInnerVal(id, col.columnType, col.schemaVersion)
- val op = tryOption(operation, GraphUtil.toOp)
- val jsObject = Json.parse(props).asOpt[JsObject].getOrElse(Json.obj())
- val parsedProps = toProps(col, jsObject).toMap
- Vertex(VertexId(col.id.get, idVal), ts, parsedProps, op = op)
- }
- }
- }
-
def toProps(column: ServiceColumn, js: JsObject): Seq[(Int, InnerValLike)] = {
val props = for {
@@ -346,7 +290,7 @@ class Management(graph: Graph) {
schemaVersion: String = DEFAULT_VERSION,
isAsync: Boolean,
compressionAlgorithm: String = "gz",
- options: Option[String] = None): Try[Label] = {
+ options: Option[String]): Try[Label] = {
val labelOpt = Label.findByName(label, useCache = false)
@@ -355,7 +299,8 @@ class Management(graph: Graph) {
case Some(l) =>
throw new GraphExceptions.LabelAlreadyExistException(s"Label name ${l.label} already exist.")
case None =>
- /* create all models */
+ /** create all models */
+ if (label.length > LABEL_NAME_MAX_LENGTH ) throw new LabelNameTooLongException(s"Label name ${label} too long.( max length : 40 )")
val newLabel = Label.insertAll(label,
srcServiceName, srcColumnName, srcColumnType,
tgtServiceName, tgtColumnName, tgtColumnType,
@@ -387,8 +332,8 @@ class Management(graph: Graph) {
* copy label: only used by bulk load job. not sure if we need to parameterize hbase cluster.
*/
def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]) = {
- val old = Label.findByName(oldLabelName).getOrElse(throw new LabelAlreadyExistException(s"Old label $oldLabelName not exists."))
- if (Label.findByName(newLabelName).isDefined) throw new LabelAlreadyExistException(s"New label $newLabelName already exists.")
+ val old = Label.findByName(oldLabelName, useCache = false).getOrElse(throw new LabelNotExistException(s"Old label $oldLabelName not exists."))
+ if (Label.findByName(newLabelName, useCache = false).isDefined) throw new LabelAlreadyExistException(s"New label $newLabelName already exists.")
val allProps = old.metas.map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) }
val allIndices = old.indices.map { index => Index(index.name, index.propNames) }
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala b/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala
index 0ecbf4e..eaadb39 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala
@@ -49,8 +49,15 @@ object OrderingUtil {
case (xv: Long, yv: Long) => implicitly[Ordering[Long]].compare(xv, yv)
case (xv: Double, yv: Double) => implicitly[Ordering[Double]].compare(xv, yv)
case (xv: String, yv: String) => implicitly[Ordering[String]].compare(xv, yv)
+ case (xv: BigDecimal, yv: BigDecimal) => implicitly[Ordering[BigDecimal]].compare(xv, yv)
case (xv: JsValue, yv: JsValue) => implicitly[Ordering[JsValue]].compare(xv, yv)
case (xv: InnerValLike, yv: InnerValLike) => implicitly[Ordering[InnerValLike]].compare(xv, yv)
+ case (xv: BigDecimal, yv: Long) => implicitly[Ordering[BigDecimal]].compare(xv, BigDecimal(yv))
+ case (xv: Long, yv: BigDecimal) => implicitly[Ordering[BigDecimal]].compare(BigDecimal(xv), yv)
+ case (xv: BigDecimal, yv: Int) => implicitly[Ordering[BigDecimal]].compare(xv, BigDecimal(yv))
+ case (xv: Int, yv: BigDecimal) => implicitly[Ordering[BigDecimal]].compare(BigDecimal(xv), yv)
+ case (xv: BigDecimal, yv: Double) => implicitly[Ordering[BigDecimal]].compare(xv, BigDecimal(yv))
+ case (xv: Double, yv: BigDecimal) => implicitly[Ordering[BigDecimal]].compare(BigDecimal(xv), yv)
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
index 9ab08b3..7cc2420 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
@@ -20,11 +20,13 @@
package org.apache.s2graph.core
import org.apache.s2graph.core.GraphExceptions.BadQueryException
-import org.apache.s2graph.core.mysqls.{ColumnMeta, Label, LabelMeta, ServiceColumn}
+import org.apache.s2graph.core.mysqls._
import org.apache.s2graph.core.types.{InnerVal, InnerValLike}
-import play.api.libs.json.{Json, _}
import org.apache.s2graph.core.JSONParser._
-import scala.collection.mutable.ListBuffer
+import play.api.libs.json.{Json, _}
+
+import scala.collection.immutable
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
object PostProcess {
@@ -32,6 +34,7 @@ object PostProcess {
type EDGE_VALUES = Map[String, JsValue]
type ORDER_BY_VALUES = (Any, Any, Any, Any)
type RAW_EDGE = (EDGE_VALUES, Double, ORDER_BY_VALUES)
+ type GROUP_BY_KEY = Map[String, JsValue]
/**
* Result Entity score field name
@@ -47,523 +50,153 @@ object PostProcess {
val SCORE_FIELD_NAME = "scoreSum"
val reservedColumns = Set("cacheRemain", "from", "to", "label", "direction", "_timestamp", "timestamp", "score", "props")
- def groupEdgeResult(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = {
- val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap
- // filterNot {case (edge, score) => edge.props.contains(LabelMeta.degreeSeq)}
- val groupedEdgesWithRank = (for {
- queryRequestWithResult <- queryRequestWithResultLs
- (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
- edgeWithScore <- queryResult.edgeWithScoreLs
- (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
- if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, queryRequest.query.filterOutFields))
- } yield {
- (queryRequest.queryParam, edge, score)
- }).groupBy {
- case (queryParam, edge, rank) if edge.labelWithDir.dir == GraphUtil.directions("in") =>
- (queryParam.label.srcColumn, queryParam.label.label, queryParam.label.tgtColumn, edge.tgtVertex.innerId, edge.isDegree)
- case (queryParam, edge, rank) =>
- (queryParam.label.tgtColumn, queryParam.label.label, queryParam.label.srcColumn, edge.tgtVertex.innerId, edge.isDegree)
- }
- val ret = for {
- ((tgtColumn, labelName, srcColumn, target, isDegreeEdge), edgesAndRanks) <- groupedEdgesWithRank if !isDegreeEdge
- edgesWithRanks = edgesAndRanks.groupBy(x => x._2.srcVertex).map(_._2.head)
- id <- innerValToJsValue(target, tgtColumn.columnType)
- } yield {
- Json.obj("name" -> tgtColumn.columnName, "id" -> id,
- SCORE_FIELD_NAME -> edgesWithRanks.map(_._3).sum,
- "label" -> labelName,
- "aggr" -> Json.obj(
- "name" -> srcColumn.columnName,
- "ids" -> edgesWithRanks.flatMap { case (queryParam, edge, rank) =>
- innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType)
- },
- "edges" -> edgesWithRanks.map { case (queryParam, edge, rank) =>
- Json.obj("id" -> innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType),
- "props" -> propsToJson(edge),
- "score" -> rank
- )
- }
- )
- )
- }
-
- ret.toList
- }
-
- def sortWithFormatted(jsons: Seq[JsObject],
- scoreField: String = "scoreSum",
- queryRequestWithResultLs: Seq[QueryRequestWithResult],
- decrease: Boolean = true): JsObject = {
- val ordering = if (decrease) -1 else 1
- val sortedJsons = jsons.sortBy { jsObject => (jsObject \ scoreField).as[Double] * ordering }
-
- if (queryRequestWithResultLs.isEmpty) Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons)
- else Json.obj(
- "size" -> sortedJsons.size,
- "results" -> sortedJsons,
- "impressionId" -> queryRequestWithResultLs.head.queryRequest.query.impressionId()
- )
- }
-
- private def toHashKey(edge: Edge, queryParam: QueryParam, fields: Seq[String], delimiter: String = ","): Int = {
- val ls = for {
- field <- fields
- } yield {
- field match {
- case "from" | "_from" => edge.srcVertex.innerId.toIdString()
- case "to" | "_to" => edge.tgtVertex.innerId.toIdString()
- case "label" => edge.labelWithDir.labelId
- case "direction" => JsString(GraphUtil.fromDirection(edge.labelWithDir.dir))
- case "_timestamp" | "timestamp" => edge.ts
- case _ =>
- queryParam.label.metaPropsInvMap.get(field) match {
- case None => throw new RuntimeException(s"unknow column: $field")
- case Some(labelMeta) => edge.propsWithTs.get(labelMeta.seq) match {
- case None => labelMeta.defaultValue
- case Some(propVal) => propVal
- }
- }
- }
+ def s2EdgeParent(graph: Graph,
+ parentEdges: Seq[EdgeWithScore]): JsValue = {
+ if (parentEdges.isEmpty) JsNull
+ else {
+ val ancestors = for {
+ current <- parentEdges
+ parents = s2EdgeParent(graph, current.edge.parentEdges) if parents != JsNull
+ } yield {
+ val s2Edge = current.edge.originalEdgeOpt.getOrElse(current.edge)
+ s2EdgeToJsValue(s2Edge, current.score, false, parents = parents)
+ }
+ Json.toJson(ancestors)
}
- val ret = ls.hashCode()
- ret
- }
-
- def resultInnerIds(queryRequestWithResultLs: Seq[QueryRequestWithResult], isSrcVertex: Boolean = false): Seq[Int] = {
- for {
- queryRequestWithResult <- queryRequestWithResultLs
- (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
- q = queryRequest.query
- edgeWithScore <- queryResult.edgeWithScoreLs
- (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
- } yield toHashKey(edge, queryRequest.queryParam, q.filterOutFields)
- }
-
- def summarizeWithListExcludeFormatted(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = {
- val jsons = groupEdgeResult(queryRequestWithResultLs, exclude)
- sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs, decrease = true)
}
- def summarizeWithList(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = {
- val jsons = groupEdgeResult(queryRequestWithResultLs, exclude)
- sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs)
- }
-
- def summarizeWithListFormatted(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = {
- val jsons = groupEdgeResult(queryRequestWithResultLs, exclude)
- sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs)
- }
-
- def toSimpleVertexArrJson(queryRequestWithResultLs: Seq[QueryRequestWithResult]): JsValue = {
- toSimpleVertexArrJson(queryRequestWithResultLs, Seq.empty[QueryRequestWithResult])
- }
-
- private def orderBy(queryOption: QueryOption,
- rawEdges: ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)]): ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)] = {
- import OrderingUtil._
-
- if (queryOption.withScore && queryOption.orderByColumns.nonEmpty) {
- val ascendingLs = queryOption.orderByColumns.map(_._2)
- rawEdges.sortBy(_._3)(TupleMultiOrdering[Any](ascendingLs))
+ def s2EdgeToJsValue(s2Edge: Edge,
+ score: Double,
+ isDegree: Boolean = false,
+ parents: JsValue = JsNull): JsValue = {
+ if (isDegree) {
+ Json.obj(
+ "from" -> anyValToJsValue(s2Edge.srcId),
+ "label" -> s2Edge.labelName,
+ LabelMeta.degree.name -> anyValToJsValue(s2Edge.propsWithTs(LabelMeta.degreeSeq).innerVal.value)
+ )
} else {
- rawEdges
+ Json.obj("from" -> anyValToJsValue(s2Edge.srcId),
+ "to" -> anyValToJsValue(s2Edge.tgtId),
+ "label" -> s2Edge.labelName,
+ "score" -> score,
+ "props" -> JSONParser.propertiesToJson(s2Edge.properties),
+ "direction" -> s2Edge.direction,
+ "timestamp" -> anyValToJsValue(s2Edge.ts),
+ "parents" -> parents
+ )
}
}
- private def getColumnValue(keyWithJs: Map[String, JsValue], score: Double, edge: Edge, column: String): Any = {
- column match {
- case "score" => score
- case "timestamp" | "_timestamp" => edge.ts
- case _ =>
- keyWithJs.get(column) match {
- case None => keyWithJs.get("props").map { js => (js \ column).as[JsValue] }.get
- case Some(x) => x
- }
+ def withImpressionId(queryOption: QueryOption,
+ size: Int,
+ degrees: Seq[JsValue],
+ results: Seq[JsValue]): JsValue = {
+ queryOption.impIdOpt match {
+ case None => Json.obj(
+ "size" -> size,
+ "degrees" -> degrees,
+ "results" -> results
+ )
+ case Some(impId) =>
+ Json.obj(
+ "size" -> size,
+ "degrees" -> degrees,
+ "results" -> results,
+ Experiment.ImpressionKey -> impId
+ )
}
}
+ def s2VertexToJson(s2Vertex: Vertex): Option[JsValue] = {
+ val props = for {
+ (k, v) <- s2Vertex.properties
+ jsVal <- anyValToJsValue(v)
+ } yield k -> jsVal
- private def buildReplaceJson(jsValue: JsValue)(mapper: JsValue => JsValue): JsValue = {
- def traverse(js: JsValue): JsValue = js match {
- case JsNull => mapper(JsNull)
- case JsNumber(v) => mapper(js)
- case JsString(v) => mapper(js)
- case JsBoolean(v) => mapper(js)
- case JsArray(elements) => JsArray(elements.map { t => traverse(mapper(t)) })
- case JsObject(values) => JsObject(values.map { case (k, v) => k -> traverse(mapper(v)) })
+ for {
+ id <- anyValToJsValue(s2Vertex.innerIdVal)
+ } yield {
+ Json.obj(
+ "serviceName" -> s2Vertex.serviceName,
+ "columnName" -> s2Vertex.columnName,
+ "id" -> id,
+ "props" -> Json.toJson(props),
+ "timestamp" -> s2Vertex.ts
+ )
}
-
- traverse(jsValue)
}
- /** test query with filterOut is not working since it can not diffrentate filterOut */
- private def buildNextQuery(jsonQuery: JsValue, _cursors: Seq[Seq[String]]): JsValue = {
- val cursors = _cursors.flatten.iterator
+ def verticesToJson(s2Vertices: Seq[Vertex]): JsValue =
+ Json.toJson(s2Vertices.flatMap(s2VertexToJson(_)))
- buildReplaceJson(jsonQuery) {
- case js@JsObject(fields) =>
- val isStep = fields.find { case (k, _) => k == "label" } // find label group
- if (isStep.isEmpty) js
- else {
- // TODO: Order not ensured
- val withCursor = js.fieldSet | Set("cursor" -> JsString(cursors.next))
- JsObject(withCursor.toSeq)
- }
- case js => js
- }
- }
+ def withOptionalFields(queryOption: QueryOption,
+ size: Int,
+ degrees: Seq[JsValue],
+ results: Seq[JsValue],
+ failCount: Int = 0,
+ cursors: => JsValue,
+ nextQuery: => Option[JsValue]): JsValue = {
- private def buildRawEdges(queryOption: QueryOption,
- queryRequestWithResultLs: Seq[QueryRequestWithResult],
- excludeIds: Map[Int, Boolean],
- scoreWeight: Double = 1.0): (ListBuffer[JsValue], ListBuffer[RAW_EDGE]) = {
- val degrees = ListBuffer[JsValue]()
- val rawEdges = ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)]()
- val metaPropNamesMap = scala.collection.mutable.Map[String, Int]()
- for {
- queryRequestWithResult <- queryRequestWithResultLs
- } yield {
- val (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get
- queryRequest.queryParam.label.metaPropNames.foreach { metaPropName =>
- metaPropNamesMap.put(metaPropName, metaPropNamesMap.getOrElse(metaPropName, 0) + 1)
- }
- }
- val propsExistInAll = metaPropNamesMap.filter(kv => kv._2 == queryRequestWithResultLs.length)
- val orderByColumns = queryOption.orderByColumns.filter { case (column, _) =>
- column match {
- case "from" | "to" | "label" | "score" | "timestamp" | "_timestamp" => true
- case _ =>
- propsExistInAll.contains(column)
-// //TODO??
-// false
-// queryParam.label.metaPropNames.contains(column)
- }
- }
+ val kvs = new ArrayBuffer[(String, JsValue)]()
- /* build result jsons */
- for {
- queryRequestWithResult <- queryRequestWithResultLs
- (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
- queryParam = queryRequest.queryParam
- edgeWithScore <- queryResult.edgeWithScoreLs
- (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
- hashKey = toHashKey(edge, queryRequest.queryParam, queryOption.filterOutFields)
- if !excludeIds.contains(hashKey)
- } {
+ kvs.append("size" -> JsNumber(size))
+ kvs.append("degrees" -> JsArray(degrees))
+ kvs.append("results" -> JsArray(results))
- // edge to json
- val (srcColumn, _) = queryParam.label.srcTgtColumn(edge.labelWithDir.dir)
- val fromOpt = innerValToJsValue(edge.srcVertex.id.innerId, srcColumn.columnType)
- if (edge.isDegree && fromOpt.isDefined) {
- if (queryOption.limitOpt.isEmpty) {
- degrees += Json.obj(
- "from" -> fromOpt.get,
- "label" -> queryRequest.queryParam.label.label,
- "direction" -> GraphUtil.fromDirection(edge.labelWithDir.dir),
- LabelMeta.degree.name -> innerValToJsValue(edge.propsWithTs(LabelMeta.degreeSeq).innerVal, InnerVal.LONG)
- )
- }
- } else {
- val keyWithJs = edgeToJson(edge, score, queryRequest.query, queryRequest.queryParam)
- val orderByValues: (Any, Any, Any, Any) = orderByColumns.length match {
- case 0 =>
- (None, None, None, None)
- case 1 =>
- val it = orderByColumns.iterator
- val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
- (v1, None, None, None)
- case 2 =>
- val it = orderByColumns.iterator
- val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
- val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1)
- (v1, v2, None, None)
- case 3 =>
- val it = orderByColumns.iterator
- val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
- val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1)
- val v3 = getColumnValue(keyWithJs, score, edge, it.next()._1)
- (v1, v2, v3, None)
- case _ =>
- val it = orderByColumns.iterator
- val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
- val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1)
- val v3 = getColumnValue(keyWithJs, score, edge, it.next()._1)
- val v4 = getColumnValue(keyWithJs, score, edge, it.next()._1)
- (v1, v2, v3, v4)
- }
+ if (queryOption.impIdOpt.isDefined) kvs.append(Experiment.ImpressionKey -> JsString(queryOption.impIdOpt.get))
- val currentEdge = (keyWithJs, score * scoreWeight, orderByValues)
- rawEdges += currentEdge
- }
- }
- (degrees, rawEdges)
+ JsObject(kvs)
}
- private def buildResultJsValue(queryOption: QueryOption,
- degrees: ListBuffer[JsValue],
- rawEdges: ListBuffer[RAW_EDGE]): JsValue = {
- if (queryOption.groupByColumns.isEmpty) {
- // ordering
- val filteredEdges = rawEdges.filter(t => t._2 >= queryOption.scoreThreshold)
+ def toJson(graph: Graph,
+ queryOption: QueryOption,
+ stepResult: StepResult): JsValue = {
- val edges = queryOption.limitOpt match {
- case None => orderBy(queryOption, filteredEdges).map(_._1)
- case Some(limit) => orderBy(queryOption, filteredEdges).map(_._1).take(limit)
- }
- val resultDegrees = if (queryOption.returnDegree) degrees else emptyDegrees
- Json.obj(
- "size" -> edges.size,
- "degrees" -> resultDegrees,
-// "queryNext" -> buildNextQuery(q.jsonQuery, q.cursorStrings()),
- "results" -> edges
- )
- } else {
- val grouped = rawEdges.groupBy { case (keyWithJs, _, _) =>
- val props = keyWithJs.get("props")
+ val degrees =
+ if (queryOption.returnDegree) stepResult.degreeEdges.map(t => s2EdgeToJsValue(t.s2Edge, t.score, true))
+ else emptyDegrees
- for {
- column <- queryOption.groupByColumns
- value <- keyWithJs.get(column) match {
- case None => props.flatMap { js => (js \ column).asOpt[JsValue] }
- case Some(x) => Some(x)
- }
- } yield column -> value
+ if (queryOption.groupBy.keys.isEmpty) {
+ // no group by specified on query.
+
+ val ls = stepResult.results.map { t =>
+ val parents = if (queryOption.returnTree) s2EdgeParent(graph, t.parentEdges) else JsNull
+ s2EdgeToJsValue(t.s2Edge, t.score, false, parents)
}
+ withImpressionId(queryOption, ls.size, degrees, ls)
+ } else {
- val groupedEdgesWithScoreSum =
+ val results =
for {
- (groupByKeyVals, groupedRawEdges) <- grouped
- scoreSum = groupedRawEdges.map(x => x._2).sum if scoreSum >= queryOption.scoreThreshold
+ (groupByValues, (scoreSum, edges)) <- stepResult.grouped
} yield {
- // ordering
- val edges = orderBy(queryOption, groupedRawEdges).map(_._1)
+ val groupByKeyValues = queryOption.groupBy.keys.zip(groupByValues).map { case (k, valueOpt) =>
+ k -> valueOpt.flatMap(anyValToJsValue).getOrElse(JsNull)
+ }
+ val groupByValuesJson = Json.toJson(groupByKeyValues.toMap)
- //TODO: refactor this
- val js = if (queryOption.returnAgg)
+ if (!queryOption.returnAgg) {
Json.obj(
- "groupBy" -> Json.toJson(groupByKeyVals.toMap),
+ "groupBy" -> groupByValuesJson,
"scoreSum" -> scoreSum,
- "agg" -> edges
+ "agg" -> Json.arr()
)
- else
+ } else {
+ val agg = edges.map { t =>
+ val parents = if (queryOption.returnTree) s2EdgeParent(graph, t.parentEdges) else JsNull
+ s2EdgeToJsValue(t.s2Edge, t.score, false, parents)
+ }
+ val aggJson = Json.toJson(agg)
Json.obj(
- "groupBy" -> Json.toJson(groupByKeyVals.toMap),
+ "groupBy" -> groupByValuesJson,
"scoreSum" -> scoreSum,
- "agg" -> Json.arr()
+ "agg" -> aggJson
)
- (js, scoreSum)
- }
-
- val groupedSortedJsons = queryOption.limitOpt match {
- case None =>
- groupedEdgesWithScoreSum.toList.sortBy { case (jsVal, scoreSum) => scoreSum * -1 }.map(_._1)
- case Some(limit) =>
- groupedEdgesWithScoreSum.toList.sortBy { case (jsVal, scoreSum) => scoreSum * -1 }.map(_._1).take(limit)
- }
- val resultDegrees = if (queryOption.returnDegree) degrees else emptyDegrees
- Json.obj(
- "size" -> groupedSortedJsons.size,
- "degrees" -> resultDegrees,
-// "queryNext" -> buildNextQuery(q.jsonQuery, q.cursorStrings()),
- "results" -> groupedSortedJsons
- )
- }
- }
-
- def toSimpleVertexArrJsonMulti(queryOption: QueryOption,
- resultWithExcludeLs: Seq[(Seq[QueryRequestWithResult], Seq[QueryRequestWithResult])],
- excludes: Seq[QueryRequestWithResult]): JsValue = {
- val excludeIds = (Seq((Seq.empty, excludes)) ++ resultWithExcludeLs).foldLeft(Map.empty[Int, Boolean]) { case (acc, (result, excludes)) =>
- acc ++ resultInnerIds(excludes).map(hashKey => hashKey -> true).toMap
- }
-
- val (degrees, rawEdges) = (ListBuffer.empty[JsValue], ListBuffer.empty[RAW_EDGE])
- for {
- (result, localExclude) <- resultWithExcludeLs
- } {
- val newResult = result.map { queryRequestWithResult =>
- val (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get
- val newQuery = queryRequest.query.copy(queryOption = queryOption)
- queryRequestWithResult.copy(queryRequest = queryRequest.copy(query = newQuery))
- }
- val (_degrees, _rawEdges) = buildRawEdges(queryOption, newResult, excludeIds)
- degrees ++= _degrees
- rawEdges ++= _rawEdges
- }
- buildResultJsValue(queryOption, degrees, rawEdges)
- }
-
- def toSimpleVertexArrJson(queryOption: QueryOption,
- queryRequestWithResultLs: Seq[QueryRequestWithResult],
- exclude: Seq[QueryRequestWithResult]): JsValue = {
- val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap
- val (degrees, rawEdges) = buildRawEdges(queryOption, queryRequestWithResultLs, excludeIds)
- buildResultJsValue(queryOption, degrees, rawEdges)
- }
-
-
- def toSimpleVertexArrJson(queryRequestWithResultLs: Seq[QueryRequestWithResult],
- exclude: Seq[QueryRequestWithResult]): JsValue = {
-
- queryRequestWithResultLs.headOption.map { queryRequestWithResult =>
- val (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get
- val query = queryRequest.query
- val queryOption = query.queryOption
- val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap
- val (degrees, rawEdges) = buildRawEdges(queryOption, queryRequestWithResultLs, excludeIds)
- buildResultJsValue(queryOption, degrees, rawEdges)
- } getOrElse emptyResults
- }
-
- def verticesToJson(vertices: Iterable[Vertex]) = {
- Json.toJson(vertices.flatMap { v => vertexToJson(v) })
- }
-
- def propsToJson(edge: Edge, q: Query, queryParam: QueryParam): Map[String, JsValue] = {
- for {
- (seq, innerValWithTs) <- edge.propsWithTs if LabelMeta.isValidSeq(seq)
- labelMeta <- queryParam.label.metaPropsMap.get(seq)
- jsValue <- innerValToJsValue(innerValWithTs.innerVal, labelMeta.dataType)
- } yield labelMeta.name -> jsValue
- }
-
- private def edgeParent(parentEdges: Seq[EdgeWithScore], q: Query, queryParam: QueryParam): JsValue = {
- if (parentEdges.isEmpty) {
- JsNull
- } else {
- val parents = for {
- parent <- parentEdges
- (parentEdge, parentScore) = EdgeWithScore.unapply(parent).get
- parentQueryParam = QueryParam(parentEdge.labelWithDir)
- parents = edgeParent(parentEdge.parentEdges, q, parentQueryParam) if parents != JsNull
- } yield {
- val originalEdge = parentEdge.originalEdgeOpt.getOrElse(parentEdge)
- val edgeJson = edgeToJsonInner(originalEdge, parentScore, q, parentQueryParam) + ("parents" -> parents)
- Json.toJson(edgeJson)
- }
-
- Json.toJson(parents)
- }
- }
-
- /** TODO */
- def edgeToJsonInner(edge: Edge, score: Double, q: Query, queryParam: QueryParam): Map[String, JsValue] = {
- val (srcColumn, tgtColumn) = queryParam.label.srcTgtColumn(edge.labelWithDir.dir)
-
- val kvMapOpt = for {
- from <- innerValToJsValue(edge.srcVertex.id.innerId, srcColumn.columnType)
- to <- innerValToJsValue(edge.tgtVertex.id.innerId, tgtColumn.columnType)
- } yield {
- val targetColumns = if (q.selectColumnsSet.isEmpty) reservedColumns else (reservedColumns & q.selectColumnsSet) + "props"
- val _propsMap = queryParam.label.metaPropsDefaultMapInner ++ propsToJson(edge, q, queryParam)
- val propsMap = if (q.selectColumnsSet.nonEmpty) _propsMap.filterKeys(q.selectColumnsSet) else _propsMap
-
- val kvMap = targetColumns.foldLeft(Map.empty[String, JsValue]) { (map, column) =>
- val jsValue = column match {
- case "cacheRemain" => JsNumber(queryParam.cacheTTLInMillis - (System.currentTimeMillis() - queryParam.timestamp))
- case "from" => from
- case "to" => to
- case "label" => JsString(queryParam.label.label)
- case "direction" => JsString(GraphUtil.fromDirection(edge.labelWithDir.dir))
- case "_timestamp" | "timestamp" => JsNumber(edge.ts)
- case "score" => JsNumber(score)
- case "props" if propsMap.nonEmpty => Json.toJson(propsMap)
- case _ => JsNull
+ }
}
-
- if (jsValue == JsNull) map else map + (column -> jsValue)
- }
- kvMap
- }
-
- kvMapOpt.getOrElse(Map.empty)
- }
-
- def edgeToJson(edge: Edge, score: Double, q: Query, queryParam: QueryParam): Map[String, JsValue] = {
- val kvs = edgeToJsonInner(edge, score, q, queryParam)
- if (kvs.nonEmpty && q.returnTree) kvs + ("parents" -> Json.toJson(edgeParent(edge.parentEdges, q, queryParam)))
- else kvs
- }
-
- def vertexToJson(vertex: Vertex): Option[JsObject] = {
- val serviceColumn = ServiceColumn.findById(vertex.id.colId)
-
- for {
- id <- innerValToJsValue(vertex.innerId, serviceColumn.columnType)
- } yield {
- Json.obj("serviceName" -> serviceColumn.service.serviceName,
- "columnName" -> serviceColumn.columnName,
- "id" -> id, "props" -> propsToJson(vertex),
- "timestamp" -> vertex.ts,
- // "belongsTo" -> vertex.belongLabelIds)
- "belongsTo" -> vertex.belongLabelIds.flatMap(Label.findByIdOpt(_).map(_.label)))
+ withImpressionId(queryOption, results.size, degrees, results)
}
}
-
- private def keysToName(seqsToNames: Map[Int, String], props: Map[Int, InnerValLike]) = {
- for {
- (seq, value) <- props
- name <- seqsToNames.get(seq)
- } yield (name, value)
- }
-
- private def propsToJson(vertex: Vertex) = {
- val serviceColumn = vertex.serviceColumn
- val props = for {
- (propKey, innerVal) <- vertex.props
- columnMeta <- ColumnMeta.findByIdAndSeq(serviceColumn.id.get, propKey.toByte, useCache = true)
- jsValue <- innerValToJsValue(innerVal, columnMeta.dataType)
- } yield {
- (columnMeta.name -> jsValue)
- }
- props.toMap
- }
-
- def propsToJson(edge: Edge) = {
- for {
- (seq, v) <- edge.propsWithTs if LabelMeta.isValidSeq(seq)
- metaProp <- edge.label.metaPropsMap.get(seq)
- jsValue <- innerValToJsValue(v.innerVal, metaProp.dataType)
- } yield {
- (metaProp.name, jsValue)
- }
- }
-
- def summarizeWithListExclude(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]): JsObject = {
- val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap
-
-
- val groupedEdgesWithRank = (for {
- queryRequestWithResult <- queryRequestWithResultLs
- (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
- edgeWithScore <- queryResult.edgeWithScoreLs
- (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
- if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, queryRequest.query.filterOutFields))
- } yield {
- (edge, score)
- }).groupBy { case (edge, score) =>
- (edge.label.tgtColumn, edge.label.srcColumn, edge.tgtVertex.innerId)
- }
-
- val jsons = for {
- ((tgtColumn, srcColumn, target), edgesAndRanks) <- groupedEdgesWithRank
- (edges, ranks) = edgesAndRanks.groupBy(x => x._1.srcVertex).map(_._2.head).unzip
- tgtId <- innerValToJsValue(target, tgtColumn.columnType)
- } yield {
- Json.obj(tgtColumn.columnName -> tgtId,
- s"${srcColumn.columnName}s" ->
- edges.flatMap(edge => innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType)), "scoreSum" -> ranks.sum)
- }
- val sortedJsons = jsons.toList.sortBy { jsObj => (jsObj \ "scoreSum").as[Double] }.reverse
- if (queryRequestWithResultLs.isEmpty) {
- Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons)
- } else {
- Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons,
- "impressionId" -> queryRequestWithResultLs.head.queryRequest.query.impressionId())
- }
-
- }
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
index 28d78a9..ef40688 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -21,6 +21,7 @@ package org.apache.s2graph.core
import com.google.common.hash.Hashing
import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
import org.apache.s2graph.core.parsers.{Where, WhereParser}
import org.apache.s2graph.core.types.{HBaseSerializable, InnerVal, InnerValLike, LabelWithDirection}
@@ -51,6 +52,12 @@ object Query {
}
}
+object GroupBy {
+ val Empty = GroupBy()
+}
+case class GroupBy(keys: Seq[String] = Nil,
+ limit: Int = Int.MaxValue)
+
case class MultiQuery(queries: Seq[Query],
weights: Seq[Double],
queryOption: QueryOption,
@@ -58,7 +65,7 @@ case class MultiQuery(queries: Seq[Query],
case class QueryOption(removeCycle: Boolean = false,
selectColumns: Seq[String] = Seq.empty,
- groupByColumns: Seq[String] = Seq.empty,
+ groupBy: GroupBy = GroupBy.Empty,
orderByColumns: Seq[(String, Boolean)] = Seq.empty,
filterOutQuery: Option[Query] = None,
filterOutFields: Seq[String] = Seq(LabelMeta.to.name),
@@ -67,8 +74,11 @@ case class QueryOption(removeCycle: Boolean = false,
limitOpt: Option[Int] = None,
returnAgg: Boolean = true,
scoreThreshold: Double = Double.MinValue,
- returnDegree: Boolean = true)
-
+ returnDegree: Boolean = true,
+ impIdOpt: Option[String] = None) {
+ val orderByKeys = orderByColumns.map(_._1)
+ val ascendingVals = orderByColumns.map(_._2)
+}
case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex],
steps: IndexedSeq[Step] = Vector.empty[Step],
@@ -77,8 +87,7 @@ case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex],
val removeCycle = queryOption.removeCycle
val selectColumns = queryOption.selectColumns
-// val groupBy = queryOption.groupBy
- val groupByColumns = queryOption.groupByColumns
+ val groupBy = queryOption.groupBy
val orderByColumns = queryOption.orderByColumns
val filterOutQuery = queryOption.filterOutQuery
val filterOutFields = queryOption.filterOutFields
@@ -90,7 +99,7 @@ case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex],
def cacheKeyBytes: Array[Byte] = {
val selectBytes = Bytes.toBytes(queryOption.selectColumns.toString)
- val groupBytes = Bytes.toBytes(queryOption.groupByColumns.toString)
+ val groupBytes = Bytes.toBytes(queryOption.groupBy.keys.toString)
val orderByBytes = Bytes.toBytes(queryOption.orderByColumns.toString)
val filterOutBytes = queryOption.filterOutQuery.map(_.cacheKeyBytes).getOrElse(Array.empty[Byte])
val returnTreeBytes = Bytes.toBytes(queryOption.returnTree)
@@ -279,11 +288,23 @@ case class RankParam(labelId: Int, var keySeqAndWeights: Seq[(Byte, Double)] = S
bytes
}
}
+case class S2Request(labelName: String,
+ direction: String = "out",
+ ts: Long = System.currentTimeMillis(),
+ options: Map[String, Any] = Map.empty) {
+ val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
+ val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
+ val labelWithDir = LabelWithDirection(label.id.get, dir)
+ //TODO: need to merge options into queryParam.
+ val queryParam = QueryParam(labelWithDir, ts)
+}
object QueryParam {
lazy val Empty = QueryParam(LabelWithDirection(0, 0))
lazy val DefaultThreshold = Double.MinValue
val Delimiter = ","
+ val maxMetaByte = (-1).toByte
+ val fillArray = Array.fill(100)(maxMetaByte)
}
case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System.currentTimeMillis()) {
@@ -381,13 +402,13 @@ case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System
}
def limit(offset: Int, limit: Int): QueryParam = {
- /* since degree info is located on first always */
- if (offset == 0 && this.columnRangeFilter == null) {
- this.limit = limit + 1
- this.offset = offset
- } else {
- this.limit = limit
- this.offset = offset + 1
+ /** since degree info is located on first always */
+ this.limit = limit
+ this.offset = offset
+
+ if (this.columnRangeFilter == null) {
+ if (offset == 0) this.limit = limit + 1
+ else this.offset = offset + 1
}
// this.columnPaginationFilter = new ColumnPaginationFilter(this.limit, this.offset)
this
@@ -400,26 +421,24 @@ case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System
}
}
- def interval(from: Seq[(Byte, InnerValLike)], to: Seq[(Byte, InnerValLike)]): QueryParam = {
- // val len = label.orderTypes.size.toByte
- // val len = label.extraIndicesMap(labelOrderSeq).sortKeyTypes.size.toByte
- // logger.error(s"indicesMap: ${label.indicesMap(labelOrderSeq)}")
- val len = label.indicesMap(labelOrderSeq).sortKeyTypes.size.toByte
+ def paddingInterval(len: Byte, from: Seq[(Byte, InnerValLike)], to: Seq[(Byte, InnerValLike)]) = {
+ val fromVal = Bytes.add(propsToBytes(from), QueryParam.fillArray)
+ val toVal = propsToBytes(to)
- val minMetaByte = InnerVal.minMetaByte
- // val maxMetaByte = InnerVal.maxMetaByte
- val maxMetaByte = -1.toByte
- val toVal = Bytes.add(propsToBytes(to), Array.fill(1)(minMetaByte))
- //FIXME
- val fromVal = Bytes.add(propsToBytes(from), Array.fill(10)(maxMetaByte))
toVal(0) = len
fromVal(0) = len
- val maxBytes = fromVal
- val minBytes = toVal
+
+ val minMax = (toVal, fromVal) // inverted
+ minMax
+ }
+
+ def interval(from: Seq[(Byte, InnerValLike)], to: Seq[(Byte, InnerValLike)]): QueryParam = {
+ val len = label.indicesMap(labelOrderSeq).sortKeyTypes.size.toByte
+ val (minBytes, maxBytes) = paddingInterval(len, from, to)
+
this.columnRangeFilterMaxBytes = maxBytes
this.columnRangeFilterMinBytes = minBytes
- val rangeFilter = new ColumnRangeFilter(minBytes, true, maxBytes, true)
- this.columnRangeFilter = rangeFilter
+ this.columnRangeFilter = new ColumnRangeFilter(minBytes, true, maxBytes, true)
this
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
index 5343659..5b2622f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
@@ -21,31 +21,42 @@ package org.apache.s2graph.core
import org.apache.s2graph.core.mysqls.LabelMeta
import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs}
+import org.apache.s2graph.core.utils.logger
-import scala.collection.Seq
+import scala.collection.mutable.ListBuffer
+import scala.collection.{Seq, mutable}
object QueryResult {
- def fromVertices(query: Query): Seq[QueryRequestWithResult] = {
+ def fromVertices(query: Query): StepInnerResult = {
if (query.steps.isEmpty || query.steps.head.queryParams.isEmpty) {
- Seq.empty
+ StepInnerResult.Empty
} else {
val queryParam = query.steps.head.queryParams.head
val label = queryParam.label
val currentTs = System.currentTimeMillis()
val propsWithTs = Map(LabelMeta.timeStampSeq ->
InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), currentTs))
- for {
+ val edgeWithScoreLs = for {
vertex <- query.vertices
} yield {
- val edge = Edge(vertex, vertex, queryParam.labelWithDir, propsWithTs = propsWithTs)
- val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore)
- QueryRequestWithResult(QueryRequest(query, -1, vertex, queryParam),
- QueryResult(edgeWithScoreLs = Seq(edgeWithScore)))
- }
+ val edge = Edge(vertex, vertex, queryParam.labelWithDir, propsWithTs = propsWithTs)
+ val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore)
+ edgeWithScore
+ }
+ StepInnerResult(edgesWithScoreLs = edgeWithScoreLs, Nil, false)
}
}
}
-case class QueryRequestWithResult(queryRequest: QueryRequest, queryResult: QueryResult)
+/** inner traverse */
+object StepInnerResult {
+ val Failure = StepInnerResult(Nil, Nil, true)
+ val Empty = StepInnerResult(Nil, Nil, false)
+}
+case class StepInnerResult(edgesWithScoreLs: Seq[EdgeWithScore],
+ degreeEdges: Seq[EdgeWithScore],
+ isFailure: Boolean = false) {
+ val isEmpty = edgesWithScoreLs.isEmpty
+}
case class QueryRequest(query: Query,
stepIdx: Int,
@@ -59,3 +70,206 @@ case class QueryResult(edgeWithScoreLs: Seq[EdgeWithScore] = Nil,
isFailure: Boolean = false)
case class EdgeWithScore(edge: Edge, score: Double)
+
+
+
+
+
+/** result */
+
+object StepResult {
+
+ type Values = Seq[S2EdgeWithScore]
+ type GroupByKey = Seq[Option[Any]]
+ val EmptyOrderByValues = (None, None, None, None)
+ val Empty = StepResult(Nil, Nil, Nil)
+
+
+ def mergeOrdered(left: StepResult.Values,
+ right: StepResult.Values,
+ queryOption: QueryOption): (Double, StepResult.Values) = {
+ val merged = (left ++ right)
+ val scoreSum = merged.foldLeft(0.0) { case (prev, current) => prev + current.score }
+ if (scoreSum < queryOption.scoreThreshold) (0.0, Nil)
+ else {
+ val ordered = orderBy(queryOption, merged)
+ val filtered = ordered.take(queryOption.groupBy.limit)
+ val newScoreSum = filtered.foldLeft(0.0) { case (prev, current) => prev + current.score }
+ (newScoreSum, filtered)
+ }
+ }
+
+ def orderBy(queryOption: QueryOption, notOrdered: Values): Values = {
+ import OrderingUtil._
+
+ if (queryOption.withScore && queryOption.orderByColumns.nonEmpty) {
+ notOrdered.sortBy(_.orderByValues)(TupleMultiOrdering[Any](queryOption.ascendingVals))
+ } else {
+ notOrdered
+ }
+ }
+ def toOrderByValues(s2Edge: Edge,
+ score: Double,
+ orderByKeys: Seq[String]): (Any, Any, Any, Any) = {
+ def toValue(propertyKey: String): Any = {
+ propertyKey match {
+ case "score" => score
+ case "timestamp" | "_timestamp" => s2Edge.ts
+ case _ => s2Edge.properties.get(propertyKey)
+ }
+ }
+ if (orderByKeys.isEmpty) (None, None, None, None)
+ else {
+ orderByKeys.length match {
+ case 1 =>
+ (toValue(orderByKeys(0)), None, None, None)
+ case 2 =>
+ (toValue(orderByKeys(0)), toValue(orderByKeys(1)), None, None)
+ case 3 =>
+ (toValue(orderByKeys(0)), toValue(orderByKeys(1)), toValue(orderByKeys(2)), None)
+ case _ =>
+ (toValue(orderByKeys(0)), toValue(orderByKeys(1)), toValue(orderByKeys(2)), toValue(orderByKeys(3)))
+ }
+ }
+ }
+ /**
+ * merge multiple StepResult into one StepResult.
+ * @param queryOption
+ * @param multiStepResults
+ * @return
+ */
+ def merges(queryOption: QueryOption,
+ multiStepResults: Seq[StepResult],
+ weights: Seq[Double] = Nil): StepResult = {
+ val degrees = multiStepResults.flatMap(_.degreeEdges)
+ val ls = new mutable.ListBuffer[S2EdgeWithScore]()
+ val agg= new mutable.HashMap[GroupByKey, ListBuffer[S2EdgeWithScore]]()
+ val sums = new mutable.HashMap[GroupByKey, Double]()
+
+ for {
+ (weight, eachStepResult) <- weights.zip(multiStepResults)
+ (ordered, grouped) = (eachStepResult.results, eachStepResult.grouped)
+ } {
+ ordered.foreach { t =>
+ val newScore = t.score * weight
+ ls += t.copy(score = newScore)
+ }
+
+ // process each query's stepResult's grouped
+ for {
+ (groupByKey, (scoreSum, values)) <- grouped
+ } {
+ val buffer = agg.getOrElseUpdate(groupByKey, ListBuffer.empty[S2EdgeWithScore])
+ var scoreSum = 0.0
+ values.foreach { t =>
+ val newScore = t.score * weight
+ buffer += t.copy(score = newScore)
+ scoreSum += newScore
+ }
+ sums += (groupByKey -> scoreSum)
+ }
+ }
+
+ // process global groupBy
+ if (queryOption.groupBy.keys.nonEmpty) {
+ for {
+ s2EdgeWithScore <- ls
+ groupByKey = s2EdgeWithScore.s2Edge.selectValues(queryOption.groupBy.keys)
+ } {
+ val buffer = agg.getOrElseUpdate(groupByKey, ListBuffer.empty[S2EdgeWithScore])
+ buffer += s2EdgeWithScore
+ val newScore = sums.getOrElse(groupByKey, 0.0) + s2EdgeWithScore.score
+ sums += (groupByKey -> newScore)
+ }
+ }
+
+
+ val ordered = orderBy(queryOption, ls)
+ val grouped = for {
+ (groupByKey, scoreSum) <- sums.toSeq.sortBy(_._2 * -1)
+ aggregated = agg(groupByKey) if aggregated.nonEmpty
+ } yield groupByKey -> (scoreSum, aggregated)
+
+ StepResult(results = ordered, grouped = grouped, degrees)
+ }
+
+ //TODO: Optimize this.
+ def filterOut(graph: Graph,
+ queryOption: QueryOption,
+ baseStepResult: StepResult,
+ filterOutStepInnerResult: StepInnerResult): StepResult = {
+
+ val fields = if (queryOption.filterOutFields.isEmpty) Seq("to") else Seq("to")
+ // else queryOption.filterOutFields
+ val filterOutSet = filterOutStepInnerResult.edgesWithScoreLs.map { t =>
+ t.edge.selectValues(fields)
+ }.toSet
+
+ val filteredResults = baseStepResult.results.filter { t =>
+ val filterOutKey = t.s2Edge.selectValues(fields)
+ !filterOutSet.contains(filterOutKey)
+ }
+
+ val grouped = for {
+ (key, (scoreSum, values)) <- baseStepResult.grouped
+ (out, in) = values.partition(v => filterOutSet.contains(v.s2Edge.selectValues(fields)))
+ newScoreSum = scoreSum - out.foldLeft(0.0) { case (prev, current) => prev + current.score } if in.nonEmpty
+ } yield key -> (newScoreSum, in)
+
+
+ StepResult(results = filteredResults, grouped = grouped, baseStepResult.degreeEdges)
+ }
+ def apply(graph: Graph,
+ queryOption: QueryOption,
+ stepInnerResult: StepInnerResult): StepResult = {
+ logger.debug(s"[BeforePostProcess]: ${stepInnerResult.edgesWithScoreLs.size}")
+
+ val results = for {
+ edgeWithScore <- stepInnerResult.edgesWithScoreLs
+ } yield {
+ val edge = edgeWithScore.edge
+ val orderByValues =
+ if (queryOption.orderByColumns.isEmpty) (edgeWithScore.score, None, None, None)
+ else toOrderByValues(edge, edgeWithScore.score, queryOption.orderByKeys)
+
+ S2EdgeWithScore(edge, edgeWithScore.score, orderByValues, edgeWithScore.edge.parentEdges)
+ }
+ /** ordered flatten result */
+ val ordered = orderBy(queryOption, results)
+
+ /** ordered grouped result */
+ val grouped =
+ if (queryOption.groupBy.keys.isEmpty) Nil
+ else {
+ val agg = new mutable.HashMap[GroupByKey, (Double, Values)]()
+ results.groupBy { s2EdgeWithScore =>
+ s2EdgeWithScore.s2Edge.selectValues(queryOption.groupBy.keys, useToString = true)
+ }.map { case (k, ls) =>
+ val (scoreSum, merged) = mergeOrdered(ls, Nil, queryOption)
+ /**
+ * watch out here. by calling toString on Any, we lose type information which will be used
+ * later for toJson.
+ */
+ if (merged.nonEmpty) {
+ val newKey = merged.head.s2Edge.selectValues(queryOption.groupBy.keys, useToString = false)
+ agg += (newKey -> (scoreSum, merged))
+ }
+ }
+ agg.toSeq.sortBy(_._2._1 * -1)
+ }
+
+ val degrees = stepInnerResult.degreeEdges.map(t => S2EdgeWithScore(t.edge, t.score))
+ StepResult(results = ordered, grouped = grouped, degreeEdges = degrees)
+ }
+}
+
+case class S2EdgeWithScore(s2Edge: Edge,
+ score: Double,
+ orderByValues: (Any, Any, Any, Any) = StepResult.EmptyOrderByValues,
+ parentEdges: Seq[EdgeWithScore] = Nil)
+
+case class StepResult(results: StepResult.Values,
+ grouped: Seq[(StepResult.GroupByKey, (Double, StepResult.Values))],
+ degreeEdges: StepResult.Values) {
+ val isEmpty = results.isEmpty
+}
\ No newline at end of file