You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/11/14 17:49:52 UTC
[01/12] incubator-s2graph git commit: add options variable on label
model class.
Repository: incubator-s2graph
Updated Branches:
refs/heads/master 6610285ad -> b5908311a
add options variable on label model class.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/0e58f557
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/0e58f557
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/0e58f557
Branch: refs/heads/master
Commit: 0e58f55722b48517f0db4a99d8e823dff4fffb2d
Parents: 1c2bd04
Author: DO YUNG YOON <st...@apache.org>
Authored: Wed Mar 16 13:32:36 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Nov 10 05:46:17 2016 +0900
----------------------------------------------------------------------
.../scala/org/apache/s2graph/core/Edge.scala | 3 +-
.../org/apache/s2graph/core/Management.scala | 7 ++--
.../org/apache/s2graph/core/mysqls/Label.scala | 37 +++++++++-----------
.../s2graph/core/rest/RequestParser.scala | 3 +-
.../s2graph/core/TestCommonWithModels.scala | 12 +++----
5 files changed, 30 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0e58f557/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
index 0fab400..8813313 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
@@ -159,7 +159,8 @@ case class Edge(srcVertex: Vertex,
} else {
val outDir = labelWithDir.copy(dir = GraphUtil.directions("out"))
val base = copy(labelWithDir = outDir)
- List(base, base.reverseSrcTgtEdge)
+ val skipReverse = label.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false)
+ if (skipReverse) List(base) else List(base, base.reverseSrcTgtEdge)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0e58f557/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index d5b4335..db2c5af 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -344,7 +344,8 @@ class Management(graph: Graph) {
hTableTTL: Option[Int],
schemaVersion: String = DEFAULT_VERSION,
isAsync: Boolean,
- compressionAlgorithm: String = "gz"): Try[Label] = {
+ compressionAlgorithm: String = "gz",
+ options: Option[String] = None): Try[Label] = {
val labelOpt = Label.findByName(label, useCache = false)
@@ -358,7 +359,7 @@ class Management(graph: Graph) {
srcServiceName, srcColumnName, srcColumnType,
tgtServiceName, tgtColumnName, tgtColumnType,
isDirected, serviceName, indices, props, consistencyLevel,
- hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm)
+ hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm, options)
/* create hbase table */
val service = newLabel.service
@@ -395,6 +396,6 @@ class Management(graph: Graph) {
old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType,
old.isDirected, old.serviceName,
allIndices, allProps,
- old.consistencyLevel, hTableName, old.hTableTTL, old.schemaVersion, old.isAsync, old.compressionAlgorithm)
+ old.consistencyLevel, hTableName, old.hTableTTL, old.schemaVersion, old.isAsync, old.compressionAlgorithm, old.options)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0e58f557/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
index 90d3d67..ebbb4ba 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
@@ -23,7 +23,7 @@ import org.apache.s2graph.core.GraphExceptions.ModelNotFoundException
import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
import org.apache.s2graph.core.utils.logger
import org.apache.s2graph.core.{GraphExceptions, GraphUtil, JSONParser}
-import play.api.libs.json.Json
+import play.api.libs.json.{JsValue, JsObject, Json}
import scalikejdbc._
object Label extends Model[Label] {
@@ -35,7 +35,8 @@ object Label extends Model[Label] {
rs.int("src_service_id"), rs.string("src_column_name"), rs.string("src_column_type"),
rs.int("tgt_service_id"), rs.string("tgt_column_name"), rs.string("tgt_column_type"),
rs.boolean("is_directed"), rs.string("service_name"), rs.int("service_id"), rs.string("consistency_level"),
- rs.string("hbase_table_name"), rs.intOpt("hbase_table_ttl"), rs.string("schema_version"), rs.boolean("is_async"), rs.string("compressionAlgorithm"))
+ rs.string("hbase_table_name"), rs.intOpt("hbase_table_ttl"), rs.string("schema_version"), rs.boolean("is_async"),
+ rs.string("compressionAlgorithm"), rs.stringOpt("options"))
}
def deleteAll(label: Label)(implicit session: DBSession) = {
@@ -72,7 +73,8 @@ object Label extends Model[Label] {
hTableTTL: Option[Int],
schemaVersion: String,
isAsync: Boolean,
- compressionAlgorithm: String)(implicit session: DBSession = AutoSession) = {
+ compressionAlgorithm: String,
+ options: Option[String])(implicit session: DBSession = AutoSession) = {
sql"""
insert into labels(label,
src_service_id, src_column_name, src_column_type,
@@ -156,7 +158,8 @@ object Label extends Model[Label] {
hTableTTL: Option[Int],
schemaVersion: String,
isAsync: Boolean,
- compressionAlgorithm: String)(implicit session: DBSession = AutoSession): Label = {
+ compressionAlgorithm: String,
+ options: Option[String])(implicit session: DBSession = AutoSession): Label = {
val srcServiceOpt = Service.findByName(srcServiceName, useCache = false)
val tgtServiceOpt = Service.findByName(tgtServiceName, useCache = false)
@@ -186,7 +189,8 @@ object Label extends Model[Label] {
val createdId = insert(labelName, srcServiceId, srcColumnName, srcColumnType,
tgtServiceId, tgtColumnName, tgtColumnType, isDirected, serviceName, serviceId, consistencyLevel,
- hTableName.getOrElse(service.hTableName), hTableTTL.orElse(service.hTableTTL), schemaVersion, isAsync, compressionAlgorithm).toInt
+ hTableName.getOrElse(service.hTableName), hTableTTL.orElse(service.hTableTTL), schemaVersion, isAsync,
+ compressionAlgorithm, options).toInt
val labelMetaMap = metaProps.map { case Prop(propName, defaultValue, dataType) =>
val labelMeta = LabelMeta.findOrInsert(createdId, propName, defaultValue, dataType)
@@ -262,7 +266,8 @@ case class Label(id: Option[Int], label: String,
isDirected: Boolean = true, serviceName: String, serviceId: Int, consistencyLevel: String = "strong",
hTableName: String, hTableTTL: Option[Int],
schemaVersion: String, isAsync: Boolean = false,
- compressionAlgorithm: String) extends JSONParser {
+ compressionAlgorithm: String,
+ options: Option[String]) extends JSONParser {
def metas = LabelMeta.findAllByLabelId(id.get)
def metaSeqsToNames = metas.map(x => (x.seq, x.name)) toMap
@@ -323,6 +328,11 @@ case class Label(id: Option[Int], label: String,
jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType)
} yield prop.name -> jsValue).toMap
+ lazy val extraOptions: Map[String, JsValue] = options match {
+ case None => Map.empty
+ case Some(v) => Json.parse(v).asOpt[JsObject].map { obj => obj.fields.toMap }.getOrElse(Map.empty)
+ }
+
def srcColumnWithDir(dir: Int) = {
if (dir == GraphUtil.directions("out")) srcColumn else tgtColumn
}
@@ -353,26 +363,11 @@ case class Label(id: Option[Int], label: String,
metaProps
}
- // def srcColumnInnerVal(jsValue: JsValue) = {
- // jsValueToInnerVal(jsValue, srcColumnType, version)
- // }
- // def tgtColumnInnerVal(jsValue: JsValue) = {
- // jsValueToInnerVal(jsValue, tgtColumnType, version)
- // }
-
override def toString(): String = {
val orderByKeys = LabelMeta.findAllByLabelId(id.get)
super.toString() + orderByKeys.toString()
}
- // def findLabelIndexSeq(scoring: List[(Byte, Double)]): Byte = {
- // if (scoring.isEmpty) LabelIndex.defaultSeq
- // else {
- // LabelIndex.findByLabelIdAndSeqs(id.get, scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq)
- //
- //// LabelIndex.findByLabelIdAndSeqs(id.get, scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq)
- // }
- // }
lazy val toJson = Json.obj("labelName" -> label,
"from" -> srcColumn.toJson, "to" -> tgtColumn.toJson,
"isDirected" -> isDirected,
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0e58f557/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index f67a89b..7fa9e34 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -559,10 +559,11 @@ class RequestParser(config: Config) extends JSONParser {
val schemaVersion = (jsValue \ "schemaVersion").asOpt[String].getOrElse(HBaseType.DEFAULT_VERSION)
val isAsync = (jsValue \ "isAsync").asOpt[Boolean].getOrElse(false)
val compressionAlgorithm = (jsValue \ "compressionAlgorithm").asOpt[String].getOrElse(DefaultCompressionAlgorithm)
+ val options = (jsValue \ "options").asOpt[JsValue].map(_.toString())
(labelName, srcServiceName, srcColumnName, srcColumnType,
tgtServiceName, tgtColumnName, tgtColumnType, isDirected, serviceName,
- indices, allProps, consistencyLevel, hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm)
+ indices, allProps, consistencyLevel, hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm, options)
}
def toIndexElements(jsValue: JsValue) = Try {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0e58f557/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
index c652e80..33a901d 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
@@ -134,22 +134,22 @@ trait TestCommonWithModels {
def createTestLabel() = {
implicit val session = AutoSession
management.createLabel(labelName, serviceName, columnName, columnType, serviceName, columnName, columnType,
- isDirected = true, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4")
+ isDirected = true, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4", None)
management.createLabel(labelNameV2, serviceNameV2, columnNameV2, columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2,
- isDirected = true, serviceNameV2, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4")
+ isDirected = true, serviceNameV2, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4", None)
management.createLabel(labelNameV3, serviceNameV3, columnNameV3, columnTypeV3, serviceNameV3, tgtColumnNameV3, tgtColumnTypeV3,
- isDirected = true, serviceNameV3, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION3, false, "lg4")
+ isDirected = true, serviceNameV3, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION3, false, "lg4", None)
management.createLabel(labelNameV4, serviceNameV4, columnNameV4, columnTypeV4, serviceNameV4, tgtColumnNameV4, tgtColumnTypeV4,
- isDirected = true, serviceNameV4, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION4, false, "lg4")
+ isDirected = true, serviceNameV4, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION4, false, "lg4", None)
management.createLabel(undirectedLabelName, serviceName, columnName, columnType, serviceName, tgtColumnName, tgtColumnType,
- isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION3, false, "lg4")
+ isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4", None)
management.createLabel(undirectedLabelNameV2, serviceNameV2, columnNameV2, columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2,
- isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4")
+ isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4", None)
}
def service = Service.findByName(serviceName, useCache = false).get
[07/12] incubator-s2graph git commit: Merge branch 'master' into
S2GRAPH-125
Posted by st...@apache.org.
Merge branch 'master' into S2GRAPH-125
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/c90fb0d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/c90fb0d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/c90fb0d6
Branch: refs/heads/master
Commit: c90fb0d602530972c0c12c456cb27200fcd8841e
Parents: 5d392f5 d37888f
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu Nov 10 20:51:02 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Nov 10 20:51:02 2016 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../apache/s2graph/core/ExceptionHandler.scala | 161 ++++++-------------
.../apache/s2graph/core/storage/Storage.scala | 16 +-
.../core/storage/hbase/AsynchbaseStorage.scala | 1 +
.../apache/s2graph/rest/play/Bootstrap.scala | 35 ++--
.../s2graph/rest/play/actors/QueueActor.scala | 10 +-
.../controllers/ApplicationController.scala | 9 ++
.../play/controllers/CounterController.scala | 3 +-
.../rest/play/controllers/EdgeController.scala | 21 +--
.../play/controllers/PublishController.scala | 3 +-
.../play/controllers/VertexController.scala | 7 +-
11 files changed, 110 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c90fb0d6/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
[08/12] incubator-s2graph git commit: More advanced options on Label
for publishing to Kafka.
Posted by st...@apache.org.
More advanced options on Label for publishing to Kafka.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/3f313097
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/3f313097
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/3f313097
Branch: refs/heads/master
Commit: 3f3130978e013cf88d7cf30774401d7f2615316f
Parents: c90fb0d
Author: DO YUNG YOON <st...@apache.org>
Authored: Tue May 24 10:28:34 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Nov 10 21:42:54 2016 +0900
----------------------------------------------------------------------
.../s2graph/core/rest/RequestParser.scala | 48 +++---
.../core/Integrate/WeakLabelDeleteTest.scala | 7 +-
.../controllers/ApplicationController.scala | 2 +
.../rest/play/controllers/EdgeController.scala | 164 +++++++++----------
.../play/controllers/PublishController.scala | 2 +-
5 files changed, 108 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3f313097/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index e53ae5a..52ee50d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -456,43 +456,51 @@ class RequestParser(config: Config) {
case arr: JsArray => arr.as[List[JsValue]]
case _ => List.empty[JsValue]
}
+ }
+ def jsToStr(js: JsValue): String = js match {
+ case JsString(s) => s
+ case _ => js.toString()
}
- def toEdgesWithOrg(jsValue: JsValue, operation: String): (List[Edge], List[JsValue]) = {
- val jsValues = toJsValues(jsValue)
- val edges = jsValues.flatMap(toEdge(_, operation))
+ def parseBulkFormat(str: String): Seq[(GraphElement, String)] = {
+ val edgeStrs = str.split("\\n")
+ val elementsWithTsv = for {
+ edgeStr <- edgeStrs
+ str <- GraphUtil.parseString(edgeStr)
+ element <- Graph.toGraphElement(str)
+ } yield (element, str)
- (edges, jsValues)
+ elementsWithTsv
}
- def toEdges(jsValue: JsValue, operation: String): List[Edge] = {
- toJsValues(jsValue).flatMap { edgeJson =>
- toEdge(edgeJson, operation)
- }
+ def parseJsonFormat(jsValue: JsValue, operation: String): Seq[(Edge, String)] = {
+ val jsValues = toJsValues(jsValue)
+ jsValues.flatMap(toEdgeWithTsv(_, operation))
}
-
- private def toEdge(jsValue: JsValue, operation: String): List[Edge] = {
-
- def parseId(js: JsValue) = js match {
- case s: JsString => s.as[String]
- case o@_ => s"${o}"
- }
- val srcId = (jsValue \ "from").asOpt[JsValue].toList.map(parseId(_))
- val tgtId = (jsValue \ "to").asOpt[JsValue].toList.map(parseId(_))
- val srcIds = (jsValue \ "froms").asOpt[List[JsValue]].toList.flatMap(froms => froms.map(js => parseId(js))) ++ srcId
- val tgtIds = (jsValue \ "tos").asOpt[List[JsValue]].toList.flatMap(froms => froms.map(js => parseId(js))) ++ tgtId
+ private def toEdgeWithTsv(jsValue: JsValue, operation: String): Seq[(Edge, String)] = {
+ val srcId = (jsValue \ "from").asOpt[JsValue].map(jsToStr)
+ val tgtId = (jsValue \ "to").asOpt[JsValue].map(jsToStr)
+ val srcIds = (jsValue \ "froms").asOpt[List[JsValue]].toList.flatMap(froms => froms.map(jsToStr)) ++ srcId
+ val tgtIds = (jsValue \ "tos").asOpt[List[JsValue]].toList.flatMap(tos => tos.map(jsToStr)) ++ tgtId
val label = parse[String](jsValue, "label")
val timestamp = parse[Long](jsValue, "timestamp")
val direction = parseOption[String](jsValue, "direction").getOrElse("")
val props = (jsValue \ "props").asOpt[JsValue].getOrElse("{}")
+
for {
srcId <- srcIds
tgtId <- tgtIds
} yield {
- Management.toEdge(timestamp, operation, srcId, tgtId, label, direction, props.toString)
+ val edge = Management.toEdge(timestamp, operation, srcId, tgtId, label, direction, props.toString)
+ val tsv = (jsValue \ "direction").asOpt[String] match {
+ case None => Seq(timestamp, operation, "e", srcId, tgtId, label, props).mkString("\t")
+ case Some(dir) => Seq(timestamp, operation, "e", srcId, tgtId, label, props, dir).mkString("\t")
+ }
+
+ (edge, tsv)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3f313097/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
index c0ab323..3f76d59 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -48,7 +48,7 @@ class WeakLabelDeleteTest extends IntegrateCommon with BeforeAndAfterEach {
/** expect 4 edges */
(result \ "results").as[List[JsValue]].size should be(4)
val edges = (result \ "results").as[List[JsObject]]
- val edgesToStore = parser.toEdges(Json.toJson(edges), "delete")
+ val edgesToStore = parser.parseJsonFormat(Json.toJson(edges), "delete").map(_._1)
val rets = graph.mutateEdges(edgesToStore, withWait = true)
Await.result(rets, Duration(20, TimeUnit.MINUTES))
@@ -152,4 +152,3 @@ class WeakLabelDeleteTest extends IntegrateCommon with BeforeAndAfterEach {
}
}
-
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3f313097/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
index b328c85..13639b9 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
@@ -67,6 +67,8 @@ object ApplicationController extends Controller {
else NotFound
}
+ def skipElement(isAsync: Boolean) = !isWriteFallbackHealthy || isAsync
+
def toKafkaTopic(isAsync: Boolean) = {
if (!isWriteFallbackHealthy) Config.KAFKA_FAIL_TOPIC
else {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3f313097/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
index 5b2ef97..b78b778 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
@@ -30,159 +30,145 @@ import play.api.mvc.{Controller, Result}
import scala.collection.Seq
import scala.concurrent.Future
+import scala.util.Random
object EdgeController extends Controller {
import ApplicationController._
- import ExceptionHandler._
import play.api.libs.concurrent.Execution.Implicits._
private val s2: Graph = org.apache.s2graph.rest.play.Global.s2graph
private val requestParser: RequestParser = org.apache.s2graph.rest.play.Global.s2parser
private val walLogHandler: ExceptionHandler = org.apache.s2graph.rest.play.Global.wallLogHandler
- private def jsToStr(js: JsValue): String = js match {
- case JsString(s) => s
- case obj => obj.toString()
- }
- private def jsToStr(js: JsLookupResult): String = js.toOption.map(jsToStr).getOrElse("undefined")
-
- def toTsv(jsValue: JsValue, op: String): String = {
- val ts = jsToStr(jsValue \ "timestamp")
- val from = jsToStr(jsValue \ "from")
- val to = jsToStr(jsValue \ "to")
- val label = jsToStr(jsValue \ "label")
- val props = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj())
-
- (jsValue \ "direction").asOpt[String] match {
- case None => Seq(ts, op, "e", from, to, label, props).mkString("\t")
- case Some(dir) => Seq(ts, op, "e", from, to, label, props, dir).mkString("\t")
+ private def enqueue(topic: String, elem: GraphElement, tsv: String) = {
+ val kafkaMessage = ExceptionHandler.toKafkaMessage(topic, elem, Option(tsv))
+ walLogHandler.enqueue(kafkaMessage)
+ }
+
+ private def publish(graphElem: GraphElement, tsv: String) = {
+ val kafkaTopic = toKafkaTopic(graphElem.isAsync)
+
+ graphElem match {
+ case v: Vertex => enqueue(kafkaTopic, graphElem, tsv)
+ case e: Edge =>
+ e.label.extraOptions.get("walLog") match {
+ case None => enqueue(kafkaTopic, e, tsv)
+ case Some(walLogOpt) =>
+ (walLogOpt \ "method").as[JsValue] match {
+ case JsString("drop") => // pass
+ case JsString("sample") =>
+ val rate = (walLogOpt \ "rate").as[Int]
+ if (Random.nextInt(100) < rate) enqueue(kafkaTopic, e, tsv)
+ case _ => enqueue(kafkaTopic, e, tsv)
+ }
+ }
}
}
- def tryMutates(jsValue: JsValue, operation: String, withWait: Boolean = false): Future[Result] = {
+ private def tryMutate(elementsWithTsv: Seq[(GraphElement, String)], withWait: Boolean): Future[Result] = {
if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized)
-
else {
try {
- logger.debug(s"$jsValue")
- val (edges, jsOrgs) = requestParser.toEdgesWithOrg(jsValue, operation)
-
- for ((edge, orgJs) <- edges.zip(jsOrgs)) {
- val kafkaTopic = toKafkaTopic(edge.isAsync)
- val kafkaMessage = ExceptionHandler.toKafkaMessage(kafkaTopic, edge, Option(toTsv(orgJs, operation)))
- walLogHandler.enqueue(kafkaMessage)
+ elementsWithTsv.foreach { case (graphElem, tsv) =>
+ publish(graphElem, tsv)
}
- val edgesToStore = edges.filterNot(e => e.isAsync)
-
- if (withWait) {
- val rets = s2.mutateEdges(edgesToStore, withWait = true)
- rets.map(Json.toJson(_)).map(jsonResponse(_))
- } else {
- val rets = edgesToStore.map { edge => QueueActor.router ! edge; true }
- Future.successful(jsonResponse(Json.toJson(rets)))
+ val elementsToStore = for {
+ (e, _tsv) <- elementsWithTsv if !skipElement(e.isAsync)
+ } yield e
+
+ if (elementsToStore.isEmpty) Future.successful(jsonResponse(JsArray()))
+ else {
+ if (withWait) {
+ val rets = s2.mutateElements(elementsToStore, withWait)
+ rets.map(Json.toJson(_)).map(jsonResponse(_))
+ } else {
+ val rets = elementsToStore.map { element => QueueActor.router ! element; true }
+ Future.successful(jsonResponse(Json.toJson(rets)))
+ }
}
} catch {
case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"$e"))
- case e: Exception =>
- logger.error(s"mutateAndPublish: $e", e)
+ case e: Throwable =>
+ logger.error(s"tryMutate: ${e.getMessage}", e)
Future.successful(InternalServerError(s"${e.getStackTrace}"))
}
}
}
- def mutateAndPublish(str: String, withWait: Boolean = false): Future[Result] = {
- if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized)
+ def mutateJsonFormat(jsValue: JsValue, operation: String, withWait: Boolean = false): Future[Result] = {
+ logger.debug(s"$jsValue")
+ val edgesWithTsv = requestParser.parseJsonFormat(jsValue, operation)
+ tryMutate(edgesWithTsv, withWait)
+ }
+ def mutateBulkFormat(str: String, withWait: Boolean = false): Future[Result] = {
logger.debug(s"$str")
- val edgeStrs = str.split("\\n")
-
- var vertexCnt = 0L
- var edgeCnt = 0L
- try {
- val elements =
- for (edgeStr <- edgeStrs; str <- GraphUtil.parseString(edgeStr); element <- Graph.toGraphElement(str)) yield {
- element match {
- case v: Vertex => vertexCnt += 1
- case e: Edge => edgeCnt += 1
- }
- val kafkaTopic = toKafkaTopic(element.isAsync)
- walLogHandler.enqueue(toKafkaMessage(kafkaTopic, element, Some(str)))
- element
- }
-
- //FIXME:
- val elementsToStore = elements.filterNot(e => e.isAsync)
- if (withWait) {
- val rets = s2.mutateElements(elementsToStore, withWait)
- rets.map(Json.toJson(_)).map(jsonResponse(_))
- } else {
- val rets = elementsToStore.map { element => QueueActor.router ! element; true }
- Future.successful(jsonResponse(Json.toJson(rets)))
- }
- } catch {
- case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"$e"))
- case e: Throwable =>
- logger.error(s"mutateAndPublish: $e", e)
- Future.successful(InternalServerError(s"${e.getStackTrace}"))
- }
+ val elementsWithTsv = requestParser.parseBulkFormat(str)
+ tryMutate(elementsWithTsv, withWait)
}
def mutateBulk() = withHeaderAsync(parse.text) { request =>
- mutateAndPublish(request.body, withWait = false)
+ mutateBulkFormat(request.body, withWait = false)
}
def mutateBulkWithWait() = withHeaderAsync(parse.text) { request =>
- mutateAndPublish(request.body, withWait = true)
+ mutateBulkFormat(request.body, withWait = true)
}
def inserts() = withHeaderAsync(jsonParser) { request =>
- tryMutates(request.body, "insert")
+ mutateJsonFormat(request.body, "insert")
}
def insertsWithWait() = withHeaderAsync(jsonParser) { request =>
- tryMutates(request.body, "insert", withWait = true)
+ mutateJsonFormat(request.body, "insert", withWait = true)
}
def insertsBulk() = withHeaderAsync(jsonParser) { request =>
- tryMutates(request.body, "insertBulk")
+ mutateJsonFormat(request.body, "insertBulk")
}
def deletes() = withHeaderAsync(jsonParser) { request =>
- tryMutates(request.body, "delete")
+ mutateJsonFormat(request.body, "delete")
}
def deletesWithWait() = withHeaderAsync(jsonParser) { request =>
- tryMutates(request.body, "delete", withWait = true)
+ mutateJsonFormat(request.body, "delete", withWait = true)
}
def updates() = withHeaderAsync(jsonParser) { request =>
- tryMutates(request.body, "update")
+ mutateJsonFormat(request.body, "update")
}
def updatesWithWait() = withHeaderAsync(jsonParser) { request =>
- tryMutates(request.body, "update", withWait = true)
+ mutateJsonFormat(request.body, "update", withWait = true)
}
def increments() = withHeaderAsync(jsonParser) { request =>
- tryMutates(request.body, "increment")
+ mutateJsonFormat(request.body, "increment")
}
def incrementsWithWait() = withHeaderAsync(jsonParser) { request =>
- tryMutates(request.body, "increment", withWait = true)
+ mutateJsonFormat(request.body, "increment", withWait = true)
}
def incrementCounts() = withHeaderAsync(jsonParser) { request =>
val jsValue = request.body
- val edges = requestParser.toEdges(jsValue, "incrementCount")
+ val edgesWithTsv = requestParser.parseJsonFormat(jsValue, "incrementCount")
- s2.incrementCounts(edges, withWait = true).map { results =>
- val json = results.map { case (isSuccess, resultCount) =>
- Json.obj("success" -> isSuccess, "result" -> resultCount)
- }
+ val edges = for {
+ (e, _tsv) <- edgesWithTsv if !skipElement(e.isAsync)
+ } yield e
- jsonResponse(Json.toJson(json))
+ if (edges.isEmpty) Future.successful(jsonResponse(JsArray()))
+ else {
+ s2.incrementCounts(edges, withWait = true).map { results =>
+ val json = results.map { case (isSuccess, resultCount) =>
+ Json.obj("success" -> isSuccess, "result" -> resultCount)
+ }
+ jsonResponse(Json.toJson(json))
+ }
}
}
@@ -199,10 +185,8 @@ object EdgeController extends Controller {
id <- ids
label <- labels
} yield {
- val tsv = Seq(ts, "deleteAll", "e", jsToStr(id), jsToStr(id), label.label, "{}", direction).mkString("\t")
- val topic = topicOpt.getOrElse {
- if (label.isAsync) Config.KAFKA_LOG_TOPIC_ASYNC else Config.KAFKA_LOG_TOPIC
- }
+ val tsv = Seq(ts, "deleteAll", "e", requestParser.jsToStr(id), requestParser.jsToStr(id), label.label, "{}", direction).mkString("\t")
+ val topic = topicOpt.getOrElse { toKafkaTopic(label.isAsync) }
ExceptionHandler.toKafkaMessage(topic, tsv)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3f313097/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
index 1a037ae..0260b7a 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
@@ -69,6 +69,6 @@ object PublishController extends Controller {
// }
// }
def mutateBulk(topic: String) = withHeaderAsync(parse.text) { request =>
- EdgeController.mutateAndPublish(request.body)
+ EdgeController.mutateBulkFormat(request.body)
}
}
[10/12] incubator-s2graph git commit: Merge branch 'master' into
S2GRAPH-125
Posted by st...@apache.org.
Merge branch 'master' into S2GRAPH-125
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/714c68c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/714c68c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/714c68c6
Branch: refs/heads/master
Commit: 714c68c6266efa58e9522cdc31c3f4763e9a7c81
Parents: 244a93a 6610285
Author: DO YUNG YOON <st...@apache.org>
Authored: Fri Nov 11 12:34:36 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Fri Nov 11 12:34:36 2016 +0900
----------------------------------------------------------------------
CHANGES | 3 +++
project/Publisher.scala | 8 +++++---
2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
[09/12] incubator-s2graph git commit: change ratio from Int to Double.
Posted by st...@apache.org.
change ratio from Int to Double.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/244a93a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/244a93a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/244a93a9
Branch: refs/heads/master
Commit: 244a93a99f545d019b016b567c2f35ff051c9548
Parents: 3f31309
Author: daewon <da...@apache.org>
Authored: Fri Jun 24 10:30:50 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Nov 10 21:43:33 2016 +0900
----------------------------------------------------------------------
.../apache/s2graph/rest/play/controllers/EdgeController.scala | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/244a93a9/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
index b78b778..da88c3d 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
@@ -58,8 +58,8 @@ object EdgeController extends Controller {
(walLogOpt \ "method").as[JsValue] match {
case JsString("drop") => // pass
case JsString("sample") =>
- val rate = (walLogOpt \ "rate").as[Int]
- if (Random.nextInt(100) < rate) enqueue(kafkaTopic, e, tsv)
+ val rate = (walLogOpt \ "rate").as[Double]
+ if (scala.util.Random.nextDouble() < rate) enqueue(kafkaTopic, e, tsv)
case _ => enqueue(kafkaTopic, e, tsv)
}
}
[06/12] incubator-s2graph git commit: Merge branch 'master' into
S2GRAPH-125
Posted by st...@apache.org.
Merge branch 'master' into S2GRAPH-125
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/5d392f54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/5d392f54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/5d392f54
Branch: refs/heads/master
Commit: 5d392f54ca960f5234b3e473eebf38f85be94cb3
Parents: 83ffcd5 b58ba20
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu Nov 10 10:09:45 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Nov 10 10:09:45 2016 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../loader/subscriber/TransferToHFile.scala | 4 +-
.../scala/org/apache/s2graph/core/Edge.scala | 10 +-
.../org/apache/s2graph/core/JSONParser.scala | 235 +++++++++++++++++--
.../org/apache/s2graph/core/Management.scala | 3 +-
.../org/apache/s2graph/core/PostProcess.scala | 4 +-
.../org/apache/s2graph/core/mysqls/Label.scala | 9 +-
.../apache/s2graph/core/mysqls/LabelMeta.scala | 4 +-
.../s2graph/core/mysqls/ServiceColumn.scala | 2 +-
.../s2graph/core/parsers/WhereParser.scala | 6 +-
.../s2graph/core/rest/RequestParser.scala | 4 +-
.../apache/s2graph/core/JsonParserTest.scala | 3 +-
.../org/apache/s2graph/rest/netty/Server.scala | 2 +-
.../rest/play/controllers/QueryController.scala | 2 +-
14 files changed, 243 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5d392f54/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5d392f54/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5d392f54/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
----------------------------------------------------------------------
diff --cc s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
index 7d4eeee,760816e..2734211
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
@@@ -19,11 -19,13 +19,13 @@@
package org.apache.s2graph.core.mysqls
+
import org.apache.s2graph.core.GraphExceptions.ModelNotFoundException
+ import org.apache.s2graph.core.GraphUtil
import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
import org.apache.s2graph.core.utils.logger
- import org.apache.s2graph.core.{GraphExceptions, GraphUtil, JSONParser}
- import play.api.libs.json.{JsValue, JsObject, Json}
+ import org.apache.s2graph.core.JSONParser._
-import play.api.libs.json.Json
++import play.api.libs.json._
import scalikejdbc._
object Label extends Model[Label] {
@@@ -267,8 -264,7 +269,9 @@@ case class Label(id: Option[Int], label
isDirected: Boolean = true, serviceName: String, serviceId: Int, consistencyLevel: String = "strong",
hTableName: String, hTableTTL: Option[Int],
schemaVersion: String, isAsync: Boolean = false,
- compressionAlgorithm: String) {
+ compressionAlgorithm: String,
- options: Option[String]) extends JSONParser {
++ options: Option[String]) {
++
def metas = LabelMeta.findAllByLabelId(id.get)
def metaSeqsToNames = metas.map(x => (x.seq, x.name)) toMap
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5d392f54/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
[05/12] incubator-s2graph git commit: Change default action
Posted by st...@apache.org.
Change default action
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/83ffcd5c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/83ffcd5c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/83ffcd5c
Branch: refs/heads/master
Commit: 83ffcd5c947ddcc1e566b1134143ffe50daece77
Parents: b5a44d9
Author: daewon <da...@apache.org>
Authored: Mon May 2 15:40:44 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Nov 10 08:01:41 2016 +0900
----------------------------------------------------------------------
.../apache/s2graph/core/storage/Storage.scala | 20 +++++++++++---------
1 file changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/83ffcd5c/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index f5cc4f4..2d7d198 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -1322,16 +1322,18 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
}
}
- def buildVertexPutsAsync(edge: Edge): Seq[SKeyValue] =
- edge.label.extraOptions.get("skipVertex") match {
- case Some(v) if v == false =>
- if (edge.op == GraphUtil.operations("delete"))
- buildDeleteBelongsToId(edge.srcForVertex) ++ buildDeleteBelongsToId(edge.tgtForVertex)
- else
- vertexSerializer(edge.srcForVertex).toKeyValues ++ vertexSerializer(edge.tgtForVertex).toKeyValues
- case _ => Seq.empty
- }
+ def buildVertexPutsAsync(edge: Edge): Seq[SKeyValue] = {
+ val storeVertex = edge.label.extraOptions.get("storeVertex").map(_.as[Boolean]).getOrElse(false)
+ if (storeVertex) {
+ if (edge.op == GraphUtil.operations("delete"))
+ buildDeleteBelongsToId(edge.srcForVertex) ++ buildDeleteBelongsToId(edge.tgtForVertex)
+ else
+ vertexSerializer(edge.srcForVertex).toKeyValues ++ vertexSerializer(edge.tgtForVertex).toKeyValues
+ } else {
+ Seq.empty
+ }
+ }
def buildPutsAll(vertex: Vertex): Seq[SKeyValue] = {
vertex.op match {
[02/12] incubator-s2graph git commit: bug fix.
Posted by st...@apache.org.
bug fix.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/4bce75bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/4bce75bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/4bce75bc
Branch: refs/heads/master
Commit: 4bce75bc4c7822b8582503c1d4b83f80d20764fe
Parents: 0e58f55
Author: DO YUNG YOON <st...@apache.org>
Authored: Wed Mar 16 16:04:29 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Nov 10 05:47:12 2016 +0900
----------------------------------------------------------------------
.../src/main/scala/org/apache/s2graph/core/mysqls/Label.scala | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4bce75bc/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
index ebbb4ba..7d4eeee 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
@@ -79,12 +79,13 @@ object Label extends Model[Label] {
insert into labels(label,
src_service_id, src_column_name, src_column_type,
tgt_service_id, tgt_column_name, tgt_column_type,
- is_directed, service_name, service_id, consistency_level, hbase_table_name, hbase_table_ttl, schema_version, is_async, compressionAlgorithm)
+ is_directed, service_name, service_id, consistency_level, hbase_table_name, hbase_table_ttl, schema_version, is_async,
+ compressionAlgorithm, options)
values (${label},
${srcServiceId}, ${srcColumnName}, ${srcColumnType},
${tgtServiceId}, ${tgtColumnName}, ${tgtColumnType},
${isDirected}, ${serviceName}, ${serviceId}, ${consistencyLevel}, ${hTableName}, ${hTableTTL},
- ${schemaVersion}, ${isAsync}, ${compressionAlgorithm})
+ ${schemaVersion}, ${isAsync}, ${compressionAlgorithm}, ${options})
"""
.updateAndReturnGeneratedKey.apply()
}
[04/12] incubator-s2graph git commit: skip storing vertex's belongsTo
property when insert edge unless label's extra option is set as
{"skipVertex": false}.
Posted by st...@apache.org.
skip storing vertex's belongsTo property when insert edge unless label's extra option is set as {"skipVertex": false}.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/b5a44d93
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/b5a44d93
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/b5a44d93
Branch: refs/heads/master
Commit: b5a44d9321c46f2d4c3296ea6d05e0198bd06e3c
Parents: 03af01e
Author: DO YUNG YOON <st...@apache.org>
Authored: Tue Mar 29 17:34:20 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Nov 10 06:05:37 2016 +0900
----------------------------------------------------------------------
.../org/apache/s2graph/core/storage/Storage.scala | 14 ++++++++++----
1 file changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5a44d93/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index 70e47a7..f5cc4f4 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -326,6 +326,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
val (_, edgeUpdate) =
if (edge.op == GraphUtil.operations("delete")) Edge.buildDeleteBulk(None, edge)
else Edge.buildOperation(None, Seq(edge))
+
buildVertexPutsAsync(edge) ++ indexedEdgeMutations(edgeUpdate) ++
snapshotEdgeMutations(edgeUpdate) ++ increments(edgeUpdate)
}
@@ -1322,10 +1323,15 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
}
def buildVertexPutsAsync(edge: Edge): Seq[SKeyValue] =
- if (edge.op == GraphUtil.operations("delete"))
- buildDeleteBelongsToId(edge.srcForVertex) ++ buildDeleteBelongsToId(edge.tgtForVertex)
- else
- vertexSerializer(edge.srcForVertex).toKeyValues ++ vertexSerializer(edge.tgtForVertex).toKeyValues
+ edge.label.extraOptions.get("skipVertex") match {
+ case Some(v) if v == false =>
+ if (edge.op == GraphUtil.operations("delete"))
+ buildDeleteBelongsToId(edge.srcForVertex) ++ buildDeleteBelongsToId(edge.tgtForVertex)
+ else
+ vertexSerializer(edge.srcForVertex).toKeyValues ++ vertexSerializer(edge.tgtForVertex).toKeyValues
+ case _ => Seq.empty
+ }
+
def buildPutsAll(vertex: Vertex): Seq[SKeyValue] = {
vertex.op match {
[03/12] incubator-s2graph git commit: bug fix on skipReverse options.
Posted by st...@apache.org.
bug fix on skipReverse options.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/03af01e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/03af01e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/03af01e1
Branch: refs/heads/master
Commit: 03af01e1928b0370a19db82b9c863abc56360e94
Parents: 4bce75b
Author: DO YUNG YOON <st...@apache.org>
Authored: Wed Mar 16 17:02:03 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Nov 10 05:48:04 2016 +0900
----------------------------------------------------------------------
s2core/src/main/scala/org/apache/s2graph/core/Edge.scala | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/03af01e1/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
index 8813313..cb7b820 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
@@ -155,12 +155,12 @@ case class Edge(srcVertex: Vertex,
def relatedEdges = {
if (labelWithDir.isDirected) {
- List(this, duplicateEdge)
+ val skipReverse = label.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false)
+ if (skipReverse) List(this) else List(this, duplicateEdge)
} else {
val outDir = labelWithDir.copy(dir = GraphUtil.directions("out"))
val base = copy(labelWithDir = outDir)
- val skipReverse = label.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false)
- if (skipReverse) List(base) else List(base, base.reverseSrcTgtEdge)
+ List(base, base.reverseSrcTgtEdge)
}
}
[12/12] incubator-s2graph git commit: [S2GRAPH-125]: Add options
field on Label model for controlling advanced options.
Posted by st...@apache.org.
[S2GRAPH-125]: Add options field on Label model for controlling advanced options.
JIRA:
[S2GRAPH-125] https://issues.apache.org/jira/browse/S2GRAPH-125
Pull Request:
Closes #94
Authors
DO YUNG YOON: steamshon@apache.org
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/b5908311
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/b5908311
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/b5908311
Branch: refs/heads/master
Commit: b5908311a8c8e4dded5ed8694cd5780d6e622a80
Parents: 33f7b69
Author: DO YUNG YOON <st...@apache.org>
Authored: Mon Nov 14 18:48:24 2016 +0100
Committer: DO YUNG YOON <st...@apache.org>
Committed: Mon Nov 14 18:48:58 2016 +0100
----------------------------------------------------------------------
CHANGES | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5908311/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 148edde..0c8edc3 100644
--- a/CHANGES
+++ b/CHANGES
@@ -43,6 +43,8 @@ Release 0.1.0 - unreleased
S2GRAPH-12: Add Label Name Swap Feature.
(Contributed by Hyunsung Jo<hy...@gmail.com>, committed by DOYUNG YOON).
+ S2GRAPH-125: Add options field on Label model for controlling advanced options (Committed by DOYUNG YOON).
+
IMPROVEMENT
S2GRAPH-14: Abstract HBase specific methods in Management and Label (Committed by DOYUNG YOON).
[11/12] incubator-s2graph git commit: close HBaseAdmin resource when
createTable finished.
Posted by st...@apache.org.
close HBaseAdmin resource when createTable finished.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/33f7b69b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/33f7b69b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/33f7b69b
Branch: refs/heads/master
Commit: 33f7b69bfd7e8b51638328aef579052fecdb8eec
Parents: 714c68c
Author: DO YUNG YOON <st...@apache.org>
Authored: Mon Nov 14 18:47:14 2016 +0100
Committer: DO YUNG YOON <st...@apache.org>
Committed: Mon Nov 14 18:47:14 2016 +0100
----------------------------------------------------------------------
.../core/storage/hbase/AsynchbaseStorage.scala | 55 +++++++++++---------
1 file changed, 30 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/33f7b69b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 7391259..138216b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -380,33 +380,38 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm")
val admin = getAdmin(zkAddr)
val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier
- if (!admin.tableExists(TableName.valueOf(tableName))) {
- try {
- val desc = new HTableDescriptor(TableName.valueOf(tableName))
- desc.setDurability(Durability.ASYNC_WAL)
- for (cf <- cfs) {
- val columnDesc = new HColumnDescriptor(cf)
- .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
- .setBloomFilterType(BloomType.ROW)
- .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
- .setMaxVersions(1)
- .setTimeToLive(2147483647)
- .setMinVersions(0)
- .setBlocksize(32768)
- .setBlockCacheEnabled(true)
- if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
- desc.addFamily(columnDesc)
- }
+ try {
+ if (!admin.tableExists(TableName.valueOf(tableName))) {
+ try {
+ val desc = new HTableDescriptor(TableName.valueOf(tableName))
+ desc.setDurability(Durability.ASYNC_WAL)
+ for (cf <- cfs) {
+ val columnDesc = new HColumnDescriptor(cf)
+ .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
+ .setBloomFilterType(BloomType.ROW)
+ .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
+ .setMaxVersions(1)
+ .setTimeToLive(2147483647)
+ .setMinVersions(0)
+ .setBlocksize(32768)
+ .setBlockCacheEnabled(true)
+ if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
+ desc.addFamily(columnDesc)
+ }
- if (regionCount <= 1) admin.createTable(desc)
- else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount)
- } catch {
- case e: Throwable =>
- logger.error(s"$zkAddr, $tableName failed with $e", e)
- throw e
+ if (regionCount <= 1) admin.createTable(desc)
+ else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount)
+ } catch {
+ case e: Throwable =>
+ logger.error(s"$zkAddr, $tableName failed with $e", e)
+ throw e
+ }
+ } else {
+ logger.info(s"$zkAddr, $tableName, $cfs already exist.")
}
- } else {
- logger.info(s"$zkAddr, $tableName, $cfs already exist.")
+ } finally {
+ admin.close()
+ admin.getConnection.close()
}
}