You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by da...@apache.org on 2016/01/04 07:40:46 UTC
[13/46] incubator-s2graph git commit: handle json parse exception
handle json parse exception
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/e607fc9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/e607fc9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/e607fc9b
Branch: refs/heads/feature/test_daewon
Commit: e607fc9b704ae30d0040dabff62c28ae68150941
Parents: d358931
Author: Jaesang Kim <ho...@gmail.com>
Authored: Wed Dec 23 11:24:17 2015 +0900
Committer: Jaesang Kim <ho...@gmail.com>
Committed: Wed Dec 23 11:25:12 2015 +0900
----------------------------------------------------------------------
.../counter/core/v2/RankingStorageGraph.scala | 41 ++++++++++++++++++++
.../s2/counter/core/CounterFunctions.scala | 12 ++++--
2 files changed, 49 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e607fc9b/s2counter_core/src/main/scala/s2/counter/core/v2/RankingStorageGraph.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/counter/core/v2/RankingStorageGraph.scala b/s2counter_core/src/main/scala/s2/counter/core/v2/RankingStorageGraph.scala
index b0c0a41..b12efbd 100644
--- a/s2counter_core/src/main/scala/s2/counter/core/v2/RankingStorageGraph.scala
+++ b/s2counter_core/src/main/scala/s2/counter/core/v2/RankingStorageGraph.scala
@@ -207,6 +207,46 @@ class RankingStorageGraph(config: Config) extends RankingStorage {
private def getEdges(key: RankingKey, duplicate: String="first"): Future[List[JsValue]] = {
val labelName = counterModel.findById(key.policyId).get.action + labelPostfix
+// val ids = (0 until BUCKET_SHARD_COUNT).map { idx =>
+// s"${makeBucketShardKey(idx, key)}"
+// }
+//
+// val payload = Json.obj(
+// "srcVertices" -> Json.arr(
+// Json.obj(
+// "serviceName" -> SERVICE_NAME,
+// "columnName" -> BUCKET_COLUMN_NAME,
+// "ids" -> ids
+// )
+// ),
+// "steps" -> Json.arr(
+// Json.obj(
+// "step" -> Json.arr(
+// Json.obj(
+// "label" -> labelName,
+// "duplicate" -> duplicate,
+// "direction" -> "out",
+// "offset" -> 0,
+// "limit" -> -1,
+// "interval" -> Json.obj(
+// "from" -> Json.obj(
+// "time_unit" -> key.eq.tq.q.toString,
+// "time_value" -> key.eq.tq.ts
+// ),
+// "to" -> Json.obj(
+// "time_unit" -> key.eq.tq.q.toString,
+// "time_value" -> key.eq.tq.ts
+// ),
+// "scoring" -> Json.obj(
+// "score" -> 1
+// )
+// )
+// )
+// )
+// )
+// )
+// )
+
val ids = {
(0 until BUCKET_SHARD_COUNT).map { shardIdx =>
s""""${makeBucketShardKey(shardIdx, key)}""""
@@ -246,6 +286,7 @@ class RankingStorageGraph(config: Config) extends RankingStorage {
log.debug(strJs)
val payload = Json.parse(strJs)
+
wsClient.url(s"$s2graphUrl/graphs/getEdges").post(payload).map { resp =>
resp.status match {
case HttpStatus.SC_OK =>
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e607fc9b/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala
----------------------------------------------------------------------
diff --git a/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala b/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala
index 32e3d0c..cc2e54b 100644
--- a/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala
+++ b/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala
@@ -128,6 +128,12 @@ object CounterFunctions extends Logging with WithKafka {
itemRankingRdd.unpersist(false)
}
}
+
+ private def parseLine(line: String): Option[TrxLog] = Try {
+ val js = Json.parse(line)
+ js.toString()
+ js.as[TrxLog]
+ }.toOption
def makeTrxLogRdd(rdd: RDD[(String, String)], numPartitions: Int): RDD[TrxLog] = {
rdd.mapPartitions { part =>
@@ -135,10 +141,8 @@ object CounterFunctions extends Logging with WithKafka {
for {
(k, v) <- part
line <- GraphUtil.parseString(v)
- trxLog = Json.parse(line).as[TrxLog] if trxLog.success
- } yield {
- trxLog
- }
+ trxLog <- parseLine(line) if trxLog.success
+ } yield trxLog
}
}