You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by da...@apache.org on 2016/01/04 07:40:46 UTC

[13/46] incubator-s2graph git commit: handle json parse exception

handle json parse exception


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/e607fc9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/e607fc9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/e607fc9b

Branch: refs/heads/feature/test_daewon
Commit: e607fc9b704ae30d0040dabff62c28ae68150941
Parents: d358931
Author: Jaesang Kim <ho...@gmail.com>
Authored: Wed Dec 23 11:24:17 2015 +0900
Committer: Jaesang Kim <ho...@gmail.com>
Committed: Wed Dec 23 11:25:12 2015 +0900

----------------------------------------------------------------------
 .../counter/core/v2/RankingStorageGraph.scala   | 41 ++++++++++++++++++++
 .../s2/counter/core/CounterFunctions.scala      | 12 ++++--
 2 files changed, 49 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e607fc9b/s2counter_core/src/main/scala/s2/counter/core/v2/RankingStorageGraph.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/counter/core/v2/RankingStorageGraph.scala b/s2counter_core/src/main/scala/s2/counter/core/v2/RankingStorageGraph.scala
index b0c0a41..b12efbd 100644
--- a/s2counter_core/src/main/scala/s2/counter/core/v2/RankingStorageGraph.scala
+++ b/s2counter_core/src/main/scala/s2/counter/core/v2/RankingStorageGraph.scala
@@ -207,6 +207,46 @@ class RankingStorageGraph(config: Config) extends RankingStorage {
   private def getEdges(key: RankingKey, duplicate: String="first"): Future[List[JsValue]] = {
     val labelName = counterModel.findById(key.policyId).get.action + labelPostfix
 
+//    val ids = (0 until BUCKET_SHARD_COUNT).map { idx =>
+//      s"${makeBucketShardKey(idx, key)}"
+//    }
+//
+//    val payload = Json.obj(
+//      "srcVertices" -> Json.arr(
+//        Json.obj(
+//          "serviceName" -> SERVICE_NAME,
+//          "columnName" -> BUCKET_COLUMN_NAME,
+//          "ids" -> ids
+//        )
+//      ),
+//      "steps" -> Json.arr(
+//        Json.obj(
+//          "step" -> Json.arr(
+//            Json.obj(
+//              "label" -> labelName,
+//              "duplicate" -> duplicate,
+//              "direction" -> "out",
+//              "offset" -> 0,
+//              "limit" -> -1,
+//              "interval" -> Json.obj(
+//                "from" -> Json.obj(
+//                  "time_unit" -> key.eq.tq.q.toString,
+//                  "time_value" -> key.eq.tq.ts
+//                ),
+//                "to" -> Json.obj(
+//                  "time_unit" -> key.eq.tq.q.toString,
+//                  "time_value" -> key.eq.tq.ts
+//                ),
+//                "scoring" -> Json.obj(
+//                  "score" -> 1
+//                )
+//              )
+//            )
+//          )
+//        )
+//      )
+//    )
+
     val ids = {
       (0 until BUCKET_SHARD_COUNT).map { shardIdx =>
         s""""${makeBucketShardKey(shardIdx, key)}""""
@@ -246,6 +286,7 @@ class RankingStorageGraph(config: Config) extends RankingStorage {
     log.debug(strJs)
 
     val payload = Json.parse(strJs)
+
     wsClient.url(s"$s2graphUrl/graphs/getEdges").post(payload).map { resp =>
       resp.status match {
         case HttpStatus.SC_OK =>

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e607fc9b/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala
----------------------------------------------------------------------
diff --git a/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala b/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala
index 32e3d0c..cc2e54b 100644
--- a/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala
+++ b/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala
@@ -128,6 +128,12 @@ object CounterFunctions extends Logging with WithKafka {
       itemRankingRdd.unpersist(false)
     }
   }
+
+  private def parseLine(line: String): Option[TrxLog] = Try {
+    val js = Json.parse(line)
+    js.toString()
+    js.as[TrxLog]
+  }.toOption
   
   def makeTrxLogRdd(rdd: RDD[(String, String)], numPartitions: Int): RDD[TrxLog] = {
     rdd.mapPartitions { part =>
@@ -135,10 +141,8 @@ object CounterFunctions extends Logging with WithKafka {
       for {
         (k, v) <- part
         line <- GraphUtil.parseString(v)
-        trxLog = Json.parse(line).as[TrxLog] if trxLog.success
-      } yield {
-        trxLog
-      }
+        trxLog <- parseLine(line) if trxLog.success
+      } yield trxLog
     }
   }