You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/11/14 17:49:52 UTC

[01/12] incubator-s2graph git commit: add options variable on label model class.

Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 6610285ad -> b5908311a


add options variable on label model class.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/0e58f557
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/0e58f557
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/0e58f557

Branch: refs/heads/master
Commit: 0e58f55722b48517f0db4a99d8e823dff4fffb2d
Parents: 1c2bd04
Author: DO YUNG YOON <st...@apache.org>
Authored: Wed Mar 16 13:32:36 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Nov 10 05:46:17 2016 +0900

----------------------------------------------------------------------
 .../scala/org/apache/s2graph/core/Edge.scala    |  3 +-
 .../org/apache/s2graph/core/Management.scala    |  7 ++--
 .../org/apache/s2graph/core/mysqls/Label.scala  | 37 +++++++++-----------
 .../s2graph/core/rest/RequestParser.scala       |  3 +-
 .../s2graph/core/TestCommonWithModels.scala     | 12 +++----
 5 files changed, 30 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0e58f557/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
index 0fab400..8813313 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
@@ -159,7 +159,8 @@ case class Edge(srcVertex: Vertex,
     } else {
       val outDir = labelWithDir.copy(dir = GraphUtil.directions("out"))
       val base = copy(labelWithDir = outDir)
-      List(base, base.reverseSrcTgtEdge)
+      val skipReverse = label.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false)
+      if (skipReverse) List(base) else List(base, base.reverseSrcTgtEdge)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0e58f557/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index d5b4335..db2c5af 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -344,7 +344,8 @@ class Management(graph: Graph) {
                   hTableTTL: Option[Int],
                   schemaVersion: String = DEFAULT_VERSION,
                   isAsync: Boolean,
-                  compressionAlgorithm: String = "gz"): Try[Label] = {
+                  compressionAlgorithm: String = "gz",
+                  options: Option[String] = None): Try[Label] = {
 
     val labelOpt = Label.findByName(label, useCache = false)
 
@@ -358,7 +359,7 @@ class Management(graph: Graph) {
             srcServiceName, srcColumnName, srcColumnType,
             tgtServiceName, tgtColumnName, tgtColumnType,
             isDirected, serviceName, indices, props, consistencyLevel,
-            hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm)
+            hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm, options)
 
           /* create hbase table */
           val service = newLabel.service
@@ -395,6 +396,6 @@ class Management(graph: Graph) {
       old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType,
       old.isDirected, old.serviceName,
       allIndices, allProps,
-      old.consistencyLevel, hTableName, old.hTableTTL, old.schemaVersion, old.isAsync, old.compressionAlgorithm)
+      old.consistencyLevel, hTableName, old.hTableTTL, old.schemaVersion, old.isAsync, old.compressionAlgorithm, old.options)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0e58f557/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
index 90d3d67..ebbb4ba 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
@@ -23,7 +23,7 @@ import org.apache.s2graph.core.GraphExceptions.ModelNotFoundException
 import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
 import org.apache.s2graph.core.utils.logger
 import org.apache.s2graph.core.{GraphExceptions, GraphUtil, JSONParser}
-import play.api.libs.json.Json
+import play.api.libs.json.{JsValue, JsObject, Json}
 import scalikejdbc._
 
 object Label extends Model[Label] {
@@ -35,7 +35,8 @@ object Label extends Model[Label] {
       rs.int("src_service_id"), rs.string("src_column_name"), rs.string("src_column_type"),
       rs.int("tgt_service_id"), rs.string("tgt_column_name"), rs.string("tgt_column_type"),
       rs.boolean("is_directed"), rs.string("service_name"), rs.int("service_id"), rs.string("consistency_level"),
-      rs.string("hbase_table_name"), rs.intOpt("hbase_table_ttl"), rs.string("schema_version"), rs.boolean("is_async"), rs.string("compressionAlgorithm"))
+      rs.string("hbase_table_name"), rs.intOpt("hbase_table_ttl"), rs.string("schema_version"), rs.boolean("is_async"),
+      rs.string("compressionAlgorithm"), rs.stringOpt("options"))
   }
 
   def deleteAll(label: Label)(implicit session: DBSession) = {
@@ -72,7 +73,8 @@ object Label extends Model[Label] {
              hTableTTL: Option[Int],
              schemaVersion: String,
              isAsync: Boolean,
-             compressionAlgorithm: String)(implicit session: DBSession = AutoSession) = {
+             compressionAlgorithm: String,
+             options: Option[String])(implicit session: DBSession = AutoSession) = {
     sql"""
     	insert into labels(label,
     src_service_id, src_column_name, src_column_type,
@@ -156,7 +158,8 @@ object Label extends Model[Label] {
                 hTableTTL: Option[Int],
                 schemaVersion: String,
                 isAsync: Boolean,
-                compressionAlgorithm: String)(implicit session: DBSession = AutoSession): Label = {
+                compressionAlgorithm: String,
+                options: Option[String])(implicit session: DBSession = AutoSession): Label = {
 
     val srcServiceOpt = Service.findByName(srcServiceName, useCache = false)
     val tgtServiceOpt = Service.findByName(tgtServiceName, useCache = false)
@@ -186,7 +189,8 @@ object Label extends Model[Label] {
 
           val createdId = insert(labelName, srcServiceId, srcColumnName, srcColumnType,
             tgtServiceId, tgtColumnName, tgtColumnType, isDirected, serviceName, serviceId, consistencyLevel,
-            hTableName.getOrElse(service.hTableName), hTableTTL.orElse(service.hTableTTL), schemaVersion, isAsync, compressionAlgorithm).toInt
+            hTableName.getOrElse(service.hTableName), hTableTTL.orElse(service.hTableTTL), schemaVersion, isAsync,
+            compressionAlgorithm, options).toInt
 
           val labelMetaMap = metaProps.map { case Prop(propName, defaultValue, dataType) =>
             val labelMeta = LabelMeta.findOrInsert(createdId, propName, defaultValue, dataType)
@@ -262,7 +266,8 @@ case class Label(id: Option[Int], label: String,
                  isDirected: Boolean = true, serviceName: String, serviceId: Int, consistencyLevel: String = "strong",
                  hTableName: String, hTableTTL: Option[Int],
                  schemaVersion: String, isAsync: Boolean = false,
-                 compressionAlgorithm: String) extends JSONParser {
+                 compressionAlgorithm: String,
+                 options: Option[String]) extends JSONParser {
   def metas = LabelMeta.findAllByLabelId(id.get)
 
   def metaSeqsToNames = metas.map(x => (x.seq, x.name)) toMap
@@ -323,6 +328,11 @@ case class Label(id: Option[Int], label: String,
     jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType)
   } yield prop.name -> jsValue).toMap
 
+  lazy val extraOptions: Map[String, JsValue] = options match {
+    case None => Map.empty
+    case Some(v) => Json.parse(v).asOpt[JsObject].map { obj => obj.fields.toMap }.getOrElse(Map.empty)
+  }
+
   def srcColumnWithDir(dir: Int) = {
     if (dir == GraphUtil.directions("out")) srcColumn else tgtColumn
   }
@@ -353,26 +363,11 @@ case class Label(id: Option[Int], label: String,
     metaProps
   }
 
-  //  def srcColumnInnerVal(jsValue: JsValue) = {
-  //    jsValueToInnerVal(jsValue, srcColumnType, version)
-  //  }
-  //  def tgtColumnInnerVal(jsValue: JsValue) = {
-  //    jsValueToInnerVal(jsValue, tgtColumnType, version)
-  //  }
-
   override def toString(): String = {
     val orderByKeys = LabelMeta.findAllByLabelId(id.get)
     super.toString() + orderByKeys.toString()
   }
 
-  //  def findLabelIndexSeq(scoring: List[(Byte, Double)]): Byte = {
-  //    if (scoring.isEmpty) LabelIndex.defaultSeq
-  //    else {
-  //      LabelIndex.findByLabelIdAndSeqs(id.get, scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq)
-  //
-  ////      LabelIndex.findByLabelIdAndSeqs(id.get, scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq)
-  //    }
-  //  }
   lazy val toJson = Json.obj("labelName" -> label,
     "from" -> srcColumn.toJson, "to" -> tgtColumn.toJson,
     "isDirected" -> isDirected,

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0e58f557/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index f67a89b..7fa9e34 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -559,10 +559,11 @@ class RequestParser(config: Config) extends JSONParser {
     val schemaVersion = (jsValue \ "schemaVersion").asOpt[String].getOrElse(HBaseType.DEFAULT_VERSION)
     val isAsync = (jsValue \ "isAsync").asOpt[Boolean].getOrElse(false)
     val compressionAlgorithm = (jsValue \ "compressionAlgorithm").asOpt[String].getOrElse(DefaultCompressionAlgorithm)
+    val options = (jsValue \ "options").asOpt[JsValue].map(_.toString())
 
     (labelName, srcServiceName, srcColumnName, srcColumnType,
       tgtServiceName, tgtColumnName, tgtColumnType, isDirected, serviceName,
-      indices, allProps, consistencyLevel, hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm)
+      indices, allProps, consistencyLevel, hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm, options)
   }
 
   def toIndexElements(jsValue: JsValue) = Try {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0e58f557/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
index c652e80..33a901d 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
@@ -134,22 +134,22 @@ trait TestCommonWithModels {
   def createTestLabel() = {
     implicit val session = AutoSession
     management.createLabel(labelName, serviceName, columnName, columnType, serviceName, columnName, columnType,
-      isDirected = true, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4")
+      isDirected = true, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4", None)
 
     management.createLabel(labelNameV2, serviceNameV2, columnNameV2, columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2,
-      isDirected = true, serviceNameV2, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4")
+      isDirected = true, serviceNameV2, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4", None)
 
     management.createLabel(labelNameV3, serviceNameV3, columnNameV3, columnTypeV3, serviceNameV3, tgtColumnNameV3, tgtColumnTypeV3,
-      isDirected = true, serviceNameV3, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION3, false, "lg4")
+      isDirected = true, serviceNameV3, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION3, false, "lg4", None)
 
     management.createLabel(labelNameV4, serviceNameV4, columnNameV4, columnTypeV4, serviceNameV4, tgtColumnNameV4, tgtColumnTypeV4,
-      isDirected = true, serviceNameV4, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION4, false, "lg4")
+      isDirected = true, serviceNameV4, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION4, false, "lg4", None)
 
     management.createLabel(undirectedLabelName, serviceName, columnName, columnType, serviceName, tgtColumnName, tgtColumnType,
-      isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION3, false, "lg4")
+      isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4", None)
 
     management.createLabel(undirectedLabelNameV2, serviceNameV2, columnNameV2, columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2,
-      isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4")
+      isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4", None)
   }
 
   def service = Service.findByName(serviceName, useCache = false).get


[07/12] incubator-s2graph git commit: Merge branch 'master' into S2GRAPH-125

Posted by st...@apache.org.
Merge branch 'master' into S2GRAPH-125


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/c90fb0d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/c90fb0d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/c90fb0d6

Branch: refs/heads/master
Commit: c90fb0d602530972c0c12c456cb27200fcd8841e
Parents: 5d392f5 d37888f
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu Nov 10 20:51:02 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Nov 10 20:51:02 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../apache/s2graph/core/ExceptionHandler.scala  | 161 ++++++-------------
 .../apache/s2graph/core/storage/Storage.scala   |  16 +-
 .../core/storage/hbase/AsynchbaseStorage.scala  |   1 +
 .../apache/s2graph/rest/play/Bootstrap.scala    |  35 ++--
 .../s2graph/rest/play/actors/QueueActor.scala   |  10 +-
 .../controllers/ApplicationController.scala     |   9 ++
 .../play/controllers/CounterController.scala    |   3 +-
 .../rest/play/controllers/EdgeController.scala  |  21 +--
 .../play/controllers/PublishController.scala    |   3 +-
 .../play/controllers/VertexController.scala     |   7 +-
 11 files changed, 110 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c90fb0d6/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------


[08/12] incubator-s2graph git commit: More advanced options on Label for publishing to Kafka.

Posted by st...@apache.org.
More advanced options on Label for publishing to Kafka.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/3f313097
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/3f313097
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/3f313097

Branch: refs/heads/master
Commit: 3f3130978e013cf88d7cf30774401d7f2615316f
Parents: c90fb0d
Author: DO YUNG YOON <st...@apache.org>
Authored: Tue May 24 10:28:34 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Nov 10 21:42:54 2016 +0900

----------------------------------------------------------------------
 .../s2graph/core/rest/RequestParser.scala       |  48 +++---
 .../core/Integrate/WeakLabelDeleteTest.scala    |   7 +-
 .../controllers/ApplicationController.scala     |   2 +
 .../rest/play/controllers/EdgeController.scala  | 164 +++++++++----------
 .../play/controllers/PublishController.scala    |   2 +-
 5 files changed, 108 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3f313097/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index e53ae5a..52ee50d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -456,43 +456,51 @@ class RequestParser(config: Config) {
       case arr: JsArray => arr.as[List[JsValue]]
       case _ => List.empty[JsValue]
     }
+  }
 
+  def jsToStr(js: JsValue): String = js match {
+    case JsString(s) => s
+    case _ => js.toString()
   }
 
-  def toEdgesWithOrg(jsValue: JsValue, operation: String): (List[Edge], List[JsValue]) = {
-    val jsValues = toJsValues(jsValue)
-    val edges = jsValues.flatMap(toEdge(_, operation))
+  def parseBulkFormat(str: String): Seq[(GraphElement, String)] = {
+    val edgeStrs = str.split("\\n")
+    val elementsWithTsv = for {
+      edgeStr <- edgeStrs
+      str <- GraphUtil.parseString(edgeStr)
+      element <- Graph.toGraphElement(str)
+    } yield (element, str)
 
-    (edges, jsValues)
+    elementsWithTsv
   }
 
-  def toEdges(jsValue: JsValue, operation: String): List[Edge] = {
-    toJsValues(jsValue).flatMap { edgeJson =>
-      toEdge(edgeJson, operation)
-    }
+  def parseJsonFormat(jsValue: JsValue, operation: String): Seq[(Edge, String)] = {
+    val jsValues = toJsValues(jsValue)
+    jsValues.flatMap(toEdgeWithTsv(_, operation))
   }
 
-
-  private def toEdge(jsValue: JsValue, operation: String): List[Edge] = {
-
-    def parseId(js: JsValue) = js match {
-      case s: JsString => s.as[String]
-      case o@_ => s"${o}"
-    }
-    val srcId = (jsValue \ "from").asOpt[JsValue].toList.map(parseId(_))
-    val tgtId = (jsValue \ "to").asOpt[JsValue].toList.map(parseId(_))
-    val srcIds = (jsValue \ "froms").asOpt[List[JsValue]].toList.flatMap(froms => froms.map(js => parseId(js))) ++ srcId
-    val tgtIds = (jsValue \ "tos").asOpt[List[JsValue]].toList.flatMap(froms => froms.map(js => parseId(js))) ++ tgtId
+  private def toEdgeWithTsv(jsValue: JsValue, operation: String): Seq[(Edge, String)] = {
+    val srcId = (jsValue \ "from").asOpt[JsValue].map(jsToStr)
+    val tgtId = (jsValue \ "to").asOpt[JsValue].map(jsToStr)
+    val srcIds = (jsValue \ "froms").asOpt[List[JsValue]].toList.flatMap(froms => froms.map(jsToStr)) ++ srcId
+    val tgtIds = (jsValue \ "tos").asOpt[List[JsValue]].toList.flatMap(tos => tos.map(jsToStr)) ++ tgtId
 
     val label = parse[String](jsValue, "label")
     val timestamp = parse[Long](jsValue, "timestamp")
     val direction = parseOption[String](jsValue, "direction").getOrElse("")
     val props = (jsValue \ "props").asOpt[JsValue].getOrElse("{}")
+
     for {
       srcId <- srcIds
       tgtId <- tgtIds
     } yield {
-      Management.toEdge(timestamp, operation, srcId, tgtId, label, direction, props.toString)
+      val edge = Management.toEdge(timestamp, operation, srcId, tgtId, label, direction, props.toString)
+      val tsv = (jsValue \ "direction").asOpt[String] match {
+        case None => Seq(timestamp, operation, "e", srcId, tgtId, label, props).mkString("\t")
+        case Some(dir) => Seq(timestamp, operation, "e", srcId, tgtId, label, props, dir).mkString("\t")
+      }
+
+      (edge, tsv)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3f313097/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
index c0ab323..3f76d59 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -48,7 +48,7 @@ class WeakLabelDeleteTest extends IntegrateCommon with BeforeAndAfterEach {
     /** expect 4 edges */
     (result \ "results").as[List[JsValue]].size should be(4)
     val edges = (result \ "results").as[List[JsObject]]
-    val edgesToStore = parser.toEdges(Json.toJson(edges), "delete")
+    val edgesToStore = parser.parseJsonFormat(Json.toJson(edges), "delete").map(_._1)
     val rets = graph.mutateEdges(edgesToStore, withWait = true)
     Await.result(rets, Duration(20, TimeUnit.MINUTES))
 
@@ -152,4 +152,3 @@ class WeakLabelDeleteTest extends IntegrateCommon with BeforeAndAfterEach {
   }
 
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3f313097/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
index b328c85..13639b9 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
@@ -67,6 +67,8 @@ object ApplicationController extends Controller {
     else NotFound
   }
 
+  def skipElement(isAsync: Boolean) = !isWriteFallbackHealthy || isAsync
+
   def toKafkaTopic(isAsync: Boolean) = {
     if (!isWriteFallbackHealthy) Config.KAFKA_FAIL_TOPIC
     else {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3f313097/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
index 5b2ef97..b78b778 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
@@ -30,159 +30,145 @@ import play.api.mvc.{Controller, Result}
 
 import scala.collection.Seq
 import scala.concurrent.Future
+import scala.util.Random
 
 object EdgeController extends Controller {
 
   import ApplicationController._
-  import ExceptionHandler._
   import play.api.libs.concurrent.Execution.Implicits._
 
   private val s2: Graph = org.apache.s2graph.rest.play.Global.s2graph
   private val requestParser: RequestParser = org.apache.s2graph.rest.play.Global.s2parser
   private val walLogHandler: ExceptionHandler = org.apache.s2graph.rest.play.Global.wallLogHandler
 
-  private def jsToStr(js: JsValue): String = js match {
-    case JsString(s) => s
-    case obj => obj.toString()
-  }
-  private def jsToStr(js: JsLookupResult): String = js.toOption.map(jsToStr).getOrElse("undefined")
-
-  def toTsv(jsValue: JsValue, op: String): String = {
-    val ts = jsToStr(jsValue \ "timestamp")
-    val from = jsToStr(jsValue \ "from")
-    val to = jsToStr(jsValue \ "to")
-    val label = jsToStr(jsValue \ "label")
-    val props = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj())
-
-    (jsValue \ "direction").asOpt[String] match {
-      case None => Seq(ts, op, "e", from, to, label, props).mkString("\t")
-      case Some(dir) => Seq(ts, op, "e", from, to, label, props, dir).mkString("\t")
+  private def enqueue(topic: String, elem: GraphElement, tsv: String) = {
+    val kafkaMessage = ExceptionHandler.toKafkaMessage(topic, elem, Option(tsv))
+    walLogHandler.enqueue(kafkaMessage)
+  }
+
+  private def publish(graphElem: GraphElement, tsv: String) = {
+    val kafkaTopic = toKafkaTopic(graphElem.isAsync)
+
+    graphElem match {
+      case v: Vertex => enqueue(kafkaTopic, graphElem, tsv)
+      case e: Edge =>
+        e.label.extraOptions.get("walLog") match {
+          case None => enqueue(kafkaTopic, e, tsv)
+          case Some(walLogOpt) =>
+            (walLogOpt \ "method").as[JsValue] match {
+              case JsString("drop") => // pass
+              case JsString("sample") =>
+                val rate = (walLogOpt \ "rate").as[Int]
+                if (Random.nextInt(100) < rate) enqueue(kafkaTopic, e, tsv)
+              case _ => enqueue(kafkaTopic, e, tsv)
+            }
+        }
     }
   }
 
-  def tryMutates(jsValue: JsValue, operation: String, withWait: Boolean = false): Future[Result] = {
+  private def tryMutate(elementsWithTsv: Seq[(GraphElement, String)], withWait: Boolean): Future[Result] = {
     if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized)
-
     else {
       try {
-        logger.debug(s"$jsValue")
-        val (edges, jsOrgs) = requestParser.toEdgesWithOrg(jsValue, operation)
-
-        for ((edge, orgJs) <- edges.zip(jsOrgs)) {
-          val kafkaTopic = toKafkaTopic(edge.isAsync)
-          val kafkaMessage = ExceptionHandler.toKafkaMessage(kafkaTopic, edge, Option(toTsv(orgJs, operation)))
-          walLogHandler.enqueue(kafkaMessage)
+        elementsWithTsv.foreach { case (graphElem, tsv) =>
+          publish(graphElem, tsv)
         }
 
-        val edgesToStore = edges.filterNot(e => e.isAsync)
-
-        if (withWait) {
-          val rets = s2.mutateEdges(edgesToStore, withWait = true)
-          rets.map(Json.toJson(_)).map(jsonResponse(_))
-        } else {
-          val rets = edgesToStore.map { edge => QueueActor.router ! edge; true }
-          Future.successful(jsonResponse(Json.toJson(rets)))
+        val elementsToStore = for {
+          (e, _tsv) <- elementsWithTsv if !skipElement(e.isAsync)
+        } yield e
+
+        if (elementsToStore.isEmpty) Future.successful(jsonResponse(JsArray()))
+        else {
+          if (withWait) {
+            val rets = s2.mutateElements(elementsToStore, withWait)
+            rets.map(Json.toJson(_)).map(jsonResponse(_))
+          } else {
+            val rets = elementsToStore.map { element => QueueActor.router ! element; true }
+            Future.successful(jsonResponse(Json.toJson(rets)))
+          }
         }
       } catch {
         case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"$e"))
-        case e: Exception =>
-          logger.error(s"mutateAndPublish: $e", e)
+        case e: Throwable =>
+          logger.error(s"tryMutate: ${e.getMessage}", e)
           Future.successful(InternalServerError(s"${e.getStackTrace}"))
       }
     }
   }
 
-  def mutateAndPublish(str: String, withWait: Boolean = false): Future[Result] = {
-    if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized)
+  def mutateJsonFormat(jsValue: JsValue, operation: String, withWait: Boolean = false): Future[Result] = {
+    logger.debug(s"$jsValue")
+    val edgesWithTsv = requestParser.parseJsonFormat(jsValue, operation)
+    tryMutate(edgesWithTsv, withWait)
+  }
 
+  def mutateBulkFormat(str: String, withWait: Boolean = false): Future[Result] = {
     logger.debug(s"$str")
-    val edgeStrs = str.split("\\n")
-
-    var vertexCnt = 0L
-    var edgeCnt = 0L
-    try {
-      val elements =
-        for (edgeStr <- edgeStrs; str <- GraphUtil.parseString(edgeStr); element <- Graph.toGraphElement(str)) yield {
-          element match {
-            case v: Vertex => vertexCnt += 1
-            case e: Edge => edgeCnt += 1
-          }
-          val kafkaTopic = toKafkaTopic(element.isAsync)
-          walLogHandler.enqueue(toKafkaMessage(kafkaTopic, element, Some(str)))
-          element
-        }
-
-      //FIXME:
-      val elementsToStore = elements.filterNot(e => e.isAsync)
-      if (withWait) {
-        val rets = s2.mutateElements(elementsToStore, withWait)
-        rets.map(Json.toJson(_)).map(jsonResponse(_))
-      } else {
-        val rets = elementsToStore.map { element => QueueActor.router ! element; true }
-        Future.successful(jsonResponse(Json.toJson(rets)))
-      }
-    } catch {
-      case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"$e"))
-      case e: Throwable =>
-        logger.error(s"mutateAndPublish: $e", e)
-        Future.successful(InternalServerError(s"${e.getStackTrace}"))
-    }
+    val elementsWithTsv = requestParser.parseBulkFormat(str)
+    tryMutate(elementsWithTsv, withWait)
   }
 
   def mutateBulk() = withHeaderAsync(parse.text) { request =>
-    mutateAndPublish(request.body, withWait = false)
+    mutateBulkFormat(request.body, withWait = false)
   }
 
   def mutateBulkWithWait() = withHeaderAsync(parse.text) { request =>
-    mutateAndPublish(request.body, withWait = true)
+    mutateBulkFormat(request.body, withWait = true)
   }
 
   def inserts() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "insert")
+    mutateJsonFormat(request.body, "insert")
   }
 
   def insertsWithWait() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "insert", withWait = true)
+    mutateJsonFormat(request.body, "insert", withWait = true)
   }
 
   def insertsBulk() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "insertBulk")
+    mutateJsonFormat(request.body, "insertBulk")
   }
 
   def deletes() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "delete")
+    mutateJsonFormat(request.body, "delete")
   }
 
   def deletesWithWait() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "delete", withWait = true)
+    mutateJsonFormat(request.body, "delete", withWait = true)
   }
 
   def updates() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "update")
+    mutateJsonFormat(request.body, "update")
   }
 
   def updatesWithWait() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "update", withWait = true)
+    mutateJsonFormat(request.body, "update", withWait = true)
   }
 
   def increments() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "increment")
+    mutateJsonFormat(request.body, "increment")
   }
 
   def incrementsWithWait() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "increment", withWait = true)
+    mutateJsonFormat(request.body, "increment", withWait = true)
   }
 
   def incrementCounts() = withHeaderAsync(jsonParser) { request =>
     val jsValue = request.body
-    val edges = requestParser.toEdges(jsValue, "incrementCount")
+    val edgesWithTsv = requestParser.parseJsonFormat(jsValue, "incrementCount")
 
-    s2.incrementCounts(edges, withWait = true).map { results =>
-      val json = results.map { case (isSuccess, resultCount) =>
-        Json.obj("success" -> isSuccess, "result" -> resultCount)
-      }
+    val edges = for {
+      (e, _tsv) <- edgesWithTsv if !skipElement(e.isAsync)
+    } yield e
 
-      jsonResponse(Json.toJson(json))
+    if (edges.isEmpty) Future.successful(jsonResponse(JsArray()))
+    else {
+      s2.incrementCounts(edges, withWait = true).map { results =>
+        val json = results.map { case (isSuccess, resultCount) =>
+          Json.obj("success" -> isSuccess, "result" -> resultCount)
+        }
+        jsonResponse(Json.toJson(json))
+      }
     }
   }
 
@@ -199,10 +185,8 @@ object EdgeController extends Controller {
         id <- ids
         label <- labels
       } yield {
-        val tsv = Seq(ts, "deleteAll", "e", jsToStr(id), jsToStr(id), label.label, "{}", direction).mkString("\t")
-        val topic = topicOpt.getOrElse {
-          if (label.isAsync) Config.KAFKA_LOG_TOPIC_ASYNC else Config.KAFKA_LOG_TOPIC
-        }
+        val tsv = Seq(ts, "deleteAll", "e", requestParser.jsToStr(id), requestParser.jsToStr(id), label.label, "{}", direction).mkString("\t")
+        val topic = topicOpt.getOrElse { toKafkaTopic(label.isAsync) }
 
         ExceptionHandler.toKafkaMessage(topic, tsv)
       }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3f313097/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
index 1a037ae..0260b7a 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
@@ -69,6 +69,6 @@ object PublishController extends Controller {
   //    }
   //  }
   def mutateBulk(topic: String) = withHeaderAsync(parse.text) { request =>
-    EdgeController.mutateAndPublish(request.body)
+    EdgeController.mutateBulkFormat(request.body)
   }
 }


[10/12] incubator-s2graph git commit: Merge branch 'master' into S2GRAPH-125

Posted by st...@apache.org.
Merge branch 'master' into S2GRAPH-125


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/714c68c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/714c68c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/714c68c6

Branch: refs/heads/master
Commit: 714c68c6266efa58e9522cdc31c3f4763e9a7c81
Parents: 244a93a 6610285
Author: DO YUNG YOON <st...@apache.org>
Authored: Fri Nov 11 12:34:36 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Fri Nov 11 12:34:36 2016 +0900

----------------------------------------------------------------------
 CHANGES                 | 3 +++
 project/Publisher.scala | 8 +++++---
 2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------



[09/12] incubator-s2graph git commit: change ratio from Int to Double.

Posted by st...@apache.org.
change ratio from Int to Double.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/244a93a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/244a93a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/244a93a9

Branch: refs/heads/master
Commit: 244a93a99f545d019b016b567c2f35ff051c9548
Parents: 3f31309
Author: daewon <da...@apache.org>
Authored: Fri Jun 24 10:30:50 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Nov 10 21:43:33 2016 +0900

----------------------------------------------------------------------
 .../apache/s2graph/rest/play/controllers/EdgeController.scala    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/244a93a9/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
index b78b778..da88c3d 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
@@ -58,8 +58,8 @@ object EdgeController extends Controller {
             (walLogOpt \ "method").as[JsValue] match {
               case JsString("drop") => // pass
               case JsString("sample") =>
-                val rate = (walLogOpt \ "rate").as[Int]
-                if (Random.nextInt(100) < rate) enqueue(kafkaTopic, e, tsv)
+                val rate = (walLogOpt \ "rate").as[Double]
+                if (scala.util.Random.nextDouble() < rate) enqueue(kafkaTopic, e, tsv)
               case _ => enqueue(kafkaTopic, e, tsv)
             }
         }


[06/12] incubator-s2graph git commit: Merge branch 'master' into S2GRAPH-125

Posted by st...@apache.org.
Merge branch 'master' into S2GRAPH-125


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/5d392f54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/5d392f54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/5d392f54

Branch: refs/heads/master
Commit: 5d392f54ca960f5234b3e473eebf38f85be94cb3
Parents: 83ffcd5 b58ba20
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu Nov 10 10:09:45 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Nov 10 10:09:45 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../loader/subscriber/TransferToHFile.scala     |   4 +-
 .../scala/org/apache/s2graph/core/Edge.scala    |  10 +-
 .../org/apache/s2graph/core/JSONParser.scala    | 235 +++++++++++++++++--
 .../org/apache/s2graph/core/Management.scala    |   3 +-
 .../org/apache/s2graph/core/PostProcess.scala   |   4 +-
 .../org/apache/s2graph/core/mysqls/Label.scala  |   9 +-
 .../apache/s2graph/core/mysqls/LabelMeta.scala  |   4 +-
 .../s2graph/core/mysqls/ServiceColumn.scala     |   2 +-
 .../s2graph/core/parsers/WhereParser.scala      |   6 +-
 .../s2graph/core/rest/RequestParser.scala       |   4 +-
 .../apache/s2graph/core/JsonParserTest.scala    |   3 +-
 .../org/apache/s2graph/rest/netty/Server.scala  |   2 +-
 .../rest/play/controllers/QueryController.scala |   2 +-
 14 files changed, 243 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5d392f54/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5d392f54/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5d392f54/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
----------------------------------------------------------------------
diff --cc s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
index 7d4eeee,760816e..2734211
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
@@@ -19,11 -19,13 +19,13 @@@
  
  package org.apache.s2graph.core.mysqls
  
+ 
  import org.apache.s2graph.core.GraphExceptions.ModelNotFoundException
+ import org.apache.s2graph.core.GraphUtil
  import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
  import org.apache.s2graph.core.utils.logger
- import org.apache.s2graph.core.{GraphExceptions, GraphUtil, JSONParser}
- import play.api.libs.json.{JsValue, JsObject, Json}
+ import org.apache.s2graph.core.JSONParser._
 -import play.api.libs.json.Json
++import play.api.libs.json._
  import scalikejdbc._
  
  object Label extends Model[Label] {
@@@ -267,8 -264,7 +269,9 @@@ case class Label(id: Option[Int], label
                   isDirected: Boolean = true, serviceName: String, serviceId: Int, consistencyLevel: String = "strong",
                   hTableName: String, hTableTTL: Option[Int],
                   schemaVersion: String, isAsync: Boolean = false,
 -                 compressionAlgorithm: String) {
 +                 compressionAlgorithm: String,
-                  options: Option[String]) extends JSONParser {
++                 options: Option[String]) {
++
    def metas = LabelMeta.findAllByLabelId(id.get)
  
    def metaSeqsToNames = metas.map(x => (x.seq, x.name)) toMap

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5d392f54/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------


[05/12] incubator-s2graph git commit: Change default action

Posted by st...@apache.org.
Change default action


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/83ffcd5c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/83ffcd5c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/83ffcd5c

Branch: refs/heads/master
Commit: 83ffcd5c947ddcc1e566b1134143ffe50daece77
Parents: b5a44d9
Author: daewon <da...@apache.org>
Authored: Mon May 2 15:40:44 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Nov 10 08:01:41 2016 +0900

----------------------------------------------------------------------
 .../apache/s2graph/core/storage/Storage.scala   | 20 +++++++++++---------
 1 file changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/83ffcd5c/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index f5cc4f4..2d7d198 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -1322,16 +1322,18 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
     }
   }
 
-  def buildVertexPutsAsync(edge: Edge): Seq[SKeyValue] =
-    edge.label.extraOptions.get("skipVertex") match {
-      case Some(v) if v == false =>
-        if (edge.op == GraphUtil.operations("delete"))
-          buildDeleteBelongsToId(edge.srcForVertex) ++ buildDeleteBelongsToId(edge.tgtForVertex)
-        else
-          vertexSerializer(edge.srcForVertex).toKeyValues ++ vertexSerializer(edge.tgtForVertex).toKeyValues
-      case _ => Seq.empty
-    }
+  def buildVertexPutsAsync(edge: Edge): Seq[SKeyValue] = {
+    val storeVertex = edge.label.extraOptions.get("storeVertex").map(_.as[Boolean]).getOrElse(false)
 
+    if (storeVertex) {
+      if (edge.op == GraphUtil.operations("delete"))
+        buildDeleteBelongsToId(edge.srcForVertex) ++ buildDeleteBelongsToId(edge.tgtForVertex)
+      else
+        vertexSerializer(edge.srcForVertex).toKeyValues ++ vertexSerializer(edge.tgtForVertex).toKeyValues
+    } else {
+      Seq.empty
+    }
+  }
 
   def buildPutsAll(vertex: Vertex): Seq[SKeyValue] = {
     vertex.op match {


[02/12] incubator-s2graph git commit: bug fix.

Posted by st...@apache.org.
bug fix.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/4bce75bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/4bce75bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/4bce75bc

Branch: refs/heads/master
Commit: 4bce75bc4c7822b8582503c1d4b83f80d20764fe
Parents: 0e58f55
Author: DO YUNG YOON <st...@apache.org>
Authored: Wed Mar 16 16:04:29 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Nov 10 05:47:12 2016 +0900

----------------------------------------------------------------------
 .../src/main/scala/org/apache/s2graph/core/mysqls/Label.scala   | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4bce75bc/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
index ebbb4ba..7d4eeee 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
@@ -79,12 +79,13 @@ object Label extends Model[Label] {
     	insert into labels(label,
     src_service_id, src_column_name, src_column_type,
     tgt_service_id, tgt_column_name, tgt_column_type,
-    is_directed, service_name, service_id, consistency_level, hbase_table_name, hbase_table_ttl, schema_version, is_async, compressionAlgorithm)
+    is_directed, service_name, service_id, consistency_level, hbase_table_name, hbase_table_ttl, schema_version, is_async,
+    compressionAlgorithm, options)
     	values (${label},
     ${srcServiceId}, ${srcColumnName}, ${srcColumnType},
     ${tgtServiceId}, ${tgtColumnName}, ${tgtColumnType},
     ${isDirected}, ${serviceName}, ${serviceId}, ${consistencyLevel}, ${hTableName}, ${hTableTTL},
-    ${schemaVersion}, ${isAsync}, ${compressionAlgorithm})
+    ${schemaVersion}, ${isAsync}, ${compressionAlgorithm}, ${options})
     """
       .updateAndReturnGeneratedKey.apply()
   }


[04/12] incubator-s2graph git commit: skip storing vertex's belongsTo property when insert edge unless label's extra option is set as {"skipVertex": false}.

Posted by st...@apache.org.
skip storing vertex's belongsTo property when insert edge unless label's extra option is set as {"skipVertex": false}.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/b5a44d93
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/b5a44d93
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/b5a44d93

Branch: refs/heads/master
Commit: b5a44d9321c46f2d4c3296ea6d05e0198bd06e3c
Parents: 03af01e
Author: DO YUNG YOON <st...@apache.org>
Authored: Tue Mar 29 17:34:20 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Nov 10 06:05:37 2016 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/core/storage/Storage.scala     | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5a44d93/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index 70e47a7..f5cc4f4 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -326,6 +326,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
         val (_, edgeUpdate) =
           if (edge.op == GraphUtil.operations("delete")) Edge.buildDeleteBulk(None, edge)
           else Edge.buildOperation(None, Seq(edge))
+
         buildVertexPutsAsync(edge) ++ indexedEdgeMutations(edgeUpdate) ++
           snapshotEdgeMutations(edgeUpdate) ++ increments(edgeUpdate)
       }
@@ -1322,10 +1323,15 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
   }
 
   def buildVertexPutsAsync(edge: Edge): Seq[SKeyValue] =
-    if (edge.op == GraphUtil.operations("delete"))
-      buildDeleteBelongsToId(edge.srcForVertex) ++ buildDeleteBelongsToId(edge.tgtForVertex)
-    else
-      vertexSerializer(edge.srcForVertex).toKeyValues ++ vertexSerializer(edge.tgtForVertex).toKeyValues
+    edge.label.extraOptions.get("skipVertex") match {
+      case Some(v) if v == false =>
+        if (edge.op == GraphUtil.operations("delete"))
+          buildDeleteBelongsToId(edge.srcForVertex) ++ buildDeleteBelongsToId(edge.tgtForVertex)
+        else
+          vertexSerializer(edge.srcForVertex).toKeyValues ++ vertexSerializer(edge.tgtForVertex).toKeyValues
+      case _ => Seq.empty
+    }
+
 
   def buildPutsAll(vertex: Vertex): Seq[SKeyValue] = {
     vertex.op match {


[03/12] incubator-s2graph git commit: bug fix on skipReverse options.

Posted by st...@apache.org.
bug fix on skipReverse options.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/03af01e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/03af01e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/03af01e1

Branch: refs/heads/master
Commit: 03af01e1928b0370a19db82b9c863abc56360e94
Parents: 4bce75b
Author: DO YUNG YOON <st...@apache.org>
Authored: Wed Mar 16 17:02:03 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Nov 10 05:48:04 2016 +0900

----------------------------------------------------------------------
 s2core/src/main/scala/org/apache/s2graph/core/Edge.scala | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/03af01e1/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
index 8813313..cb7b820 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
@@ -155,12 +155,12 @@ case class Edge(srcVertex: Vertex,
 
   def relatedEdges = {
     if (labelWithDir.isDirected) {
-      List(this, duplicateEdge)
+      val skipReverse = label.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false)
+      if (skipReverse) List(this) else List(this, duplicateEdge)
     } else {
       val outDir = labelWithDir.copy(dir = GraphUtil.directions("out"))
       val base = copy(labelWithDir = outDir)
-      val skipReverse = label.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false)
-      if (skipReverse) List(base) else List(base, base.reverseSrcTgtEdge)
+      List(base, base.reverseSrcTgtEdge)
     }
   }
 


[12/12] incubator-s2graph git commit: [S2GRAPH-125]: Add options field on Label model for controlling advanced options.

Posted by st...@apache.org.
[S2GRAPH-125]: Add options field on Label model for controlling advanced options.

JIRA:
  [S2GRAPH-125] https://issues.apache.org/jira/browse/S2GRAPH-125

Pull Request:
  Closes #94

Authors
  DO YUNG YOON: steamshon@apache.org


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/b5908311
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/b5908311
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/b5908311

Branch: refs/heads/master
Commit: b5908311a8c8e4dded5ed8694cd5780d6e622a80
Parents: 33f7b69
Author: DO YUNG YOON <st...@apache.org>
Authored: Mon Nov 14 18:48:24 2016 +0100
Committer: DO YUNG YOON <st...@apache.org>
Committed: Mon Nov 14 18:48:58 2016 +0100

----------------------------------------------------------------------
 CHANGES | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5908311/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 148edde..0c8edc3 100644
--- a/CHANGES
+++ b/CHANGES
@@ -43,6 +43,8 @@ Release 0.1.0 - unreleased
     S2GRAPH-12: Add Label Name Swap Feature.
 		(Contributed by Hyunsung Jo<hy...@gmail.com>, committed by DOYUNG YOON).
 
+    S2GRAPH-125: Add options field on Label model for controlling advanced options (Committed by DOYUNG YOON).
+
   IMPROVEMENT
 
     S2GRAPH-14: Abstract HBase specific methods in Management and Label (Committed by DOYUNG YOON).


[11/12] incubator-s2graph git commit: close HBaseAdmin resource when createTable finished.

Posted by st...@apache.org.
close HBaseAdmin resource when createTable finished.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/33f7b69b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/33f7b69b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/33f7b69b

Branch: refs/heads/master
Commit: 33f7b69bfd7e8b51638328aef579052fecdb8eec
Parents: 714c68c
Author: DO YUNG YOON <st...@apache.org>
Authored: Mon Nov 14 18:47:14 2016 +0100
Committer: DO YUNG YOON <st...@apache.org>
Committed: Mon Nov 14 18:47:14 2016 +0100

----------------------------------------------------------------------
 .../core/storage/hbase/AsynchbaseStorage.scala  | 55 +++++++++++---------
 1 file changed, 30 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/33f7b69b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 7391259..138216b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -380,33 +380,38 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
     logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm")
     val admin = getAdmin(zkAddr)
     val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier
-    if (!admin.tableExists(TableName.valueOf(tableName))) {
-      try {
-        val desc = new HTableDescriptor(TableName.valueOf(tableName))
-        desc.setDurability(Durability.ASYNC_WAL)
-        for (cf <- cfs) {
-          val columnDesc = new HColumnDescriptor(cf)
-            .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
-            .setBloomFilterType(BloomType.ROW)
-            .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
-            .setMaxVersions(1)
-            .setTimeToLive(2147483647)
-            .setMinVersions(0)
-            .setBlocksize(32768)
-            .setBlockCacheEnabled(true)
-          if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
-          desc.addFamily(columnDesc)
-        }
+    try {
+      if (!admin.tableExists(TableName.valueOf(tableName))) {
+        try {
+          val desc = new HTableDescriptor(TableName.valueOf(tableName))
+          desc.setDurability(Durability.ASYNC_WAL)
+          for (cf <- cfs) {
+            val columnDesc = new HColumnDescriptor(cf)
+              .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
+              .setBloomFilterType(BloomType.ROW)
+              .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
+              .setMaxVersions(1)
+              .setTimeToLive(2147483647)
+              .setMinVersions(0)
+              .setBlocksize(32768)
+              .setBlockCacheEnabled(true)
+            if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
+            desc.addFamily(columnDesc)
+          }
 
-        if (regionCount <= 1) admin.createTable(desc)
-        else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount)
-      } catch {
-        case e: Throwable =>
-          logger.error(s"$zkAddr, $tableName failed with $e", e)
-          throw e
+          if (regionCount <= 1) admin.createTable(desc)
+          else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount)
+        } catch {
+          case e: Throwable =>
+            logger.error(s"$zkAddr, $tableName failed with $e", e)
+            throw e
+        }
+      } else {
+        logger.info(s"$zkAddr, $tableName, $cfs already exist.")
       }
-    } else {
-      logger.info(s"$zkAddr, $tableName, $cfs already exist.")
+    } finally {
+      admin.close()
+      admin.getConnection.close()
     }
   }