You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/11/16 17:23:21 UTC
incubator-s2graph git commit: [S2GRAPH-123]: Support different index
on out/in direction
Repository: incubator-s2graph
Updated Branches:
refs/heads/master 66bdf1bc0 -> f7154bac9
[S2GRAPH-123]: Support different index on out/in direction
JIRA:
[S2GRAPH-123] https://issues.apache.org/jira/browse/S2GRAPH-123
Pull Request:
Closes #98
Authors
DO YUNG YOON: steamshon@apache.org
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/f7154bac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/f7154bac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/f7154bac
Branch: refs/heads/master
Commit: f7154bac9d18f7a7a16098587f69483a773cf2b6
Parents: 66bdf1b
Author: DO YUNG YOON <st...@apache.org>
Authored: Wed Nov 16 18:22:58 2016 +0100
Committer: DO YUNG YOON <st...@apache.org>
Committed: Wed Nov 16 18:22:58 2016 +0100
----------------------------------------------------------------------
CHANGES | 2 +
.../org/apache/s2graph/core/mysqls/schema.sql | 21 ++---
.../scala/org/apache/s2graph/core/Edge.scala | 22 +++++-
.../org/apache/s2graph/core/Management.scala | 7 +-
.../org/apache/s2graph/core/mysqls/Label.scala | 4 +-
.../apache/s2graph/core/mysqls/LabelIndex.scala | 81 ++++++++++++++++----
.../s2graph/core/rest/RequestParser.scala | 36 +++++++--
.../apache/s2graph/core/storage/Storage.scala | 5 +-
8 files changed, 137 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f7154bac/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 01d1cc8..1c5c01e 100644
--- a/CHANGES
+++ b/CHANGES
@@ -44,6 +44,8 @@ Release 0.1.0 - unreleased
(Contributed by Hyunsung Jo<hy...@gmail.com>, committed by DOYUNG YOON).
S2GRAPH-125: Add options field on Label model for controlling advanced options (Committed by DOYUNG YOON).
+
+ S2GRAPH-123: Support different index on out/in direction (Committed by DOYUNG YOON).
IMPROVEMENT
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f7154bac/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
----------------------------------------------------------------------
diff --git a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
index 822df6c..b5e09c9 100644
--- a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
+++ b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
@@ -136,15 +136,18 @@ ALTER TABLE label_metas ADD FOREIGN KEY(label_id) REFERENCES labels(id) ON DELET
-- ----------------------------
DROP TABLE IF EXISTS `label_indices`;
CREATE TABLE `label_indices` (
- `id` int(11) NOT NULL AUTO_INCREMENT,
- `label_id` int(11) NOT NULL,
- `name` varchar(64) NOT NULL DEFAULT '_PK',
- `seq` tinyint(4) NOT NULL,
- `meta_seqs` varchar(64) NOT NULL,
- `formulars` varchar(255) DEFAULT NULL,
- PRIMARY KEY (`id`),
- UNIQUE KEY `ux_label_indices_label_id_seq` (`label_id`,`meta_seqs`),
- UNIQUE KEY `ux_label_indices_label_id_name` (`label_id`,`name`)
+ `id` int(11) NOT NULL AUTO_INCREMENT,
+ `label_id` int(11) NOT NULL,
+ `name` varchar(64) NOT NULL DEFAULT '_PK',
+ `seq` tinyint(4) NOT NULL,
+ `meta_seqs` varchar(64) NOT NULL,
+ `formulars` varchar(255) DEFAULT NULL,
+ `dir` int DEFAULT NULL,
+ `options` text,
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `ux_label_id_seq` (`label_id`,`meta_seqs`),
+ UNIQUE KEY `ux_label_id_name` (`label_id`,`name`),
+ UNIQUE KEY `ux_label_id_meta_seqs_dir` (`label_id`,`meta_seqs`,`dir`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
ALTER TABLE label_indices ADD FOREIGN KEY(label_id) REFERENCES labels(id) ON DELETE CASCADE;
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f7154bac/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
index 657cfed..8a2784d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
@@ -448,6 +448,24 @@ object Edge {
}
}
+ def filterOutWithLabelOption(ls: Seq[IndexEdge]): Seq[IndexEdge] = ls.filter { ie =>
+ ie.labelIndex.dir match {
+ case None =>
+ // both direction use same indices that is defined when label creation.
+ true
+ case Some(dir) =>
+ if (dir != ie.direction) {
+ // current labelIndex's direction is different with indexEdge's direction so don't touch
+ false
+ } else {
+ ie.labelIndex.writeOption.map { option =>
+ val hashValueOpt = ie.orders.find { case (k, v) => k == LabelMeta.fromHash }.map{ case (k, v) => v.value.toString.toLong }
+ option.sample(ie, hashValueOpt)
+ }.getOrElse(true)
+ }
+ }
+ }
+
def buildMutation(snapshotEdgeOpt: Option[Edge],
requestEdge: Edge,
newVersion: Long,
@@ -477,7 +495,7 @@ object Edge {
val edgesToDelete = snapshotEdgeOpt match {
case Some(snapshotEdge) if snapshotEdge.op != GraphUtil.operations("delete") =>
snapshotEdge.copy(op = GraphUtil.defaultOpByte)
- .relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
+ .relatedEdges.flatMap { relEdge => filterOutWithLabelOption(relEdge.edgesWithIndexValid) }
case _ => Nil
}
@@ -488,7 +506,7 @@ object Edge {
version = newVersion,
propsWithTs = newPropsWithTs,
op = GraphUtil.defaultOpByte
- ).relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
+ ).relatedEdges.flatMap { relEdge => filterOutWithLabelOption(relEdge.edgesWithIndexValid) }
EdgeMutate(edgesToDelete = edgesToDelete,
edgesToInsert = edgesToInsert,
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f7154bac/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index 89acc63..60900be 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -43,8 +43,7 @@ object Management {
object Prop extends ((String, String, String) => Prop)
- case class Index(name: String, propNames: Seq[String])
-
+ case class Index(name: String, propNames: Seq[String], direction: Option[Int] = None, options: Option[String] = None)
}
import HBaseType._
@@ -135,7 +134,7 @@ object Management {
indices.foreach { index =>
val metaSeq = index.propNames.map { name => labelMetaMap(name).seq }
- LabelIndex.findOrInsert(label.id.get, index.name, metaSeq.toList, "none")
+ LabelIndex.findOrInsert(label.id.get, index.name, metaSeq.toList, "none", index.direction, index.options)
}
label
@@ -340,7 +339,7 @@ class Management(graph: Graph) {
val old = Label.findByName(oldLabelName, useCache = false).getOrElse(throw new LabelNotExistException(s"Old label $oldLabelName not exists."))
val allProps = old.metas(useCache = false).map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) }
- val allIndices = old.indices(useCache = false).map { index => Index(index.name, index.propNames) }
+ val allIndices = old.indices(useCache = false).map { index => Index(index.name, index.propNames, index.dir, index.options) }
createLabel(newLabelName, old.srcService.serviceName, old.srcColumnName, old.srcColumnType,
old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType,
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f7154bac/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
index fdef677..09d15d7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
@@ -211,11 +211,11 @@ object Label extends Model[Label] {
if (indices.isEmpty) {
// make default index with _PK, _timestamp, 0
- LabelIndex.findOrInsert(createdId, LabelIndex.DefaultName, LabelIndex.DefaultMetaSeqs.toList, "none")
+ LabelIndex.findOrInsert(createdId, LabelIndex.DefaultName, LabelIndex.DefaultMetaSeqs.toList, "none", None, None)
} else {
indices.foreach { index =>
val metaSeq = index.propNames.map { name => labelMetaMap(name) }
- LabelIndex.findOrInsert(createdId, index.name, metaSeq.toList, "none")
+ LabelIndex.findOrInsert(createdId, index.name, metaSeq.toList, "none", index.direction, index.options)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f7154bac/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
index c548868..7b1cd07 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
@@ -24,6 +24,7 @@ package org.apache.s2graph.core.mysqls
*/
import org.apache.s2graph.core.GraphUtil
+import org.apache.s2graph.core.mysqls.LabelIndex.WriteOption
import org.apache.s2graph.core.utils.logger
import play.api.libs.json.{JsObject, JsString, Json}
import scalikejdbc._
@@ -39,7 +40,30 @@ object LabelIndex extends Model[LabelIndex] {
rs.string("meta_seqs").split(",").filter(_ != "").map(s => s.toByte).toList match {
case metaSeqsList => metaSeqsList
},
- rs.string("formulars"))
+ rs.string("formulars"),
+ rs.intOpt("dir"),
+ rs.stringOpt("options")
+ )
+ }
+ object WriteOption {
+ val Default = WriteOption()
+ }
+ case class WriteOption(method: String = "default",
+ rate: Double = 1.0,
+ totalModular: Long = 100,
+ storeDegree: Boolean = true) {
+
+ def sample[T](a: T, hashOpt: Option[Long]): Boolean = {
+ if (method == "drop") false
+ else if (method == "sample") {
+ if (scala.util.Random.nextDouble() < rate) true
+ else false
+ } else if (method == "hash_sample") {
+ val hash = hashOpt.getOrElse(throw new RuntimeException("hash_sample need _from_hash value"))
+ if ((hash.abs % totalModular) / totalModular.toDouble < rate) true
+ else false
+ } else true
+ }
}
def findById(id: Int)(implicit session: DBSession = AutoSession) = {
@@ -62,24 +86,27 @@ object LabelIndex extends Model[LabelIndex] {
}
}
- def insert(labelId: Int, indexName: String, seq: Byte, metaSeqs: List[Byte], formulars: String)(implicit session: DBSession = AutoSession): Long = {
+ def insert(labelId: Int, indexName: String, seq: Byte, metaSeqs: List[Byte], formulars: String,
+ direction: Option[Int], options: Option[String])(implicit session: DBSession = AutoSession): Long = {
sql"""
- insert into label_indices(label_id, name, seq, meta_seqs, formulars)
- values (${labelId}, ${indexName}, ${seq}, ${metaSeqs.mkString(",")}, ${formulars})
+ insert into label_indices(label_id, name, seq, meta_seqs, formulars, dir, options)
+ values (${labelId}, ${indexName}, ${seq}, ${metaSeqs.mkString(",")}, ${formulars}, ${direction}, ${options})
"""
.updateAndReturnGeneratedKey.apply()
}
- def findOrInsert(labelId: Int, indexName: String, metaSeqs: List[Byte], formulars: String)(implicit session: DBSession = AutoSession): LabelIndex = {
- findByLabelIdAndSeqs(labelId, metaSeqs) match {
+ def findOrInsert(labelId: Int, indexName: String, metaSeqs: List[Byte], formulars: String,
+ direction: Option[Int], options: Option[String])(implicit session: DBSession = AutoSession): LabelIndex = {
+ findByLabelIdAndSeqs(labelId, metaSeqs, direction) match {
case Some(s) => s
case None =>
val orders = findByLabelIdAll(labelId, false)
val seq = (orders.size + 1).toByte
assert(seq <= MaxOrderSeq)
- val createdId = insert(labelId, indexName, seq, metaSeqs, formulars)
+ val createdId = insert(labelId, indexName, seq, metaSeqs, formulars, direction, options)
val cacheKeys = List(s"labelId=$labelId:seq=$seq",
- s"labelId=$labelId:seqs=$metaSeqs", s"labelId=$labelId:seq=$seq", s"id=$createdId")
+ s"labelId=$labelId:seqs=$metaSeqs:dir=$direction", s"labelId=$labelId:seq=$seq", s"id=$createdId")
+
cacheKeys.foreach { key =>
expireCache(key)
expireCaches(key)
@@ -89,11 +116,11 @@ object LabelIndex extends Model[LabelIndex] {
}
}
- def findByLabelIdAndSeqs(labelId: Int, seqs: List[Byte])(implicit session: DBSession = AutoSession): Option[LabelIndex] = {
- val cacheKey = "labelId=" + labelId + ":seqs=" + seqs.mkString(",")
+ def findByLabelIdAndSeqs(labelId: Int, seqs: List[Byte], direction: Option[Int])(implicit session: DBSession = AutoSession): Option[LabelIndex] = {
+ val cacheKey = "labelId=" + labelId + ":seqs=" + seqs.mkString(",") + ":dir=" + direction
withCache(cacheKey) {
sql"""
- select * from label_indices where label_id = ${labelId} and meta_seqs = ${seqs.mkString(",")}
+ select * from label_indices where label_id = ${labelId} and meta_seqs = ${seqs.mkString(",")} and dir = ${direction}
""".map { rs => LabelIndex(rs) }.single.apply
}
}
@@ -118,7 +145,7 @@ object LabelIndex extends Model[LabelIndex] {
val (labelId, seq) = (labelIndex.labelId, labelIndex.seq)
sql"""delete from label_indices where id = ${id}""".execute.apply()
- val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:seq=$seq", s"labelId=$labelId:seqs=$seqs")
+ val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:seq=$seq", s"labelId=$labelId:seqs=$seqs:dir=${labelIndex.dir}")
cacheKeys.foreach { key =>
expireCache(key)
expireCaches(key)
@@ -136,7 +163,7 @@ object LabelIndex extends Model[LabelIndex] {
(cacheKey -> x)
})
putsToCache(ls.map { x =>
- var cacheKey = s"labelId=${x.labelId}:seqs=${x.metaSeqs.mkString(",")}"
+ var cacheKey = s"labelId=${x.labelId}:seqs=${x.metaSeqs.mkString(",")}:dir=${x.dir}"
(cacheKey -> x)
})
putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) =>
@@ -146,14 +173,38 @@ object LabelIndex extends Model[LabelIndex] {
}
}
-case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, metaSeqs: Seq[Byte], formulars: String) {
+case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, metaSeqs: Seq[Byte], formulars: String,
+ dir: Option[Int], options: Option[String]) {
+ // both
lazy val label = Label.findById(labelId)
lazy val metas = label.metaPropsMap
lazy val sortKeyTypes = metaSeqs.flatMap(metaSeq => label.metaPropsMap.get(metaSeq))
lazy val sortKeyTypesArray = sortKeyTypes.toArray
lazy val propNames = sortKeyTypes.map { labelMeta => labelMeta.name }
+
+ val dirJs = dir.map(GraphUtil.fromDirection).getOrElse("both")
+ val optionsJs = try { options.map(Json.parse).getOrElse(Json.obj()) } catch { case e: Exception => Json.obj() }
lazy val toJson = Json.obj(
"name" -> name,
- "propNames" -> sortKeyTypes.map(x => x.name)
+ "propNames" -> sortKeyTypes.map(x => x.name),
+ "dir" -> dirJs,
+ "options" -> optionsJs
)
+
+ lazy val writeOption: Option[WriteOption] = try {
+ options.map { string =>
+ val jsObj = Json.parse(string)
+ val method = (jsObj \ "method").as[String]
+
+ val rate = (jsObj \ "rate").asOpt[Double].getOrElse(1.0)
+ val totalModular = (jsObj \ "totalModular").asOpt[Long].getOrElse(100L)
+ val storeDegree = (jsObj \ "degree").asOpt[Boolean].getOrElse(true)
+
+ WriteOption(method, rate, totalModular, storeDegree)
+ }
+ } catch {
+ case e: Exception =>
+ logger.error(s"Parse failed labelOption: ${this.label}", e)
+ None
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f7154bac/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index 5466a9a..8baf787 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -611,11 +611,23 @@ class RequestParser(graph: Graph) {
Prop(propName, defaultValue, dataType)
}
- def toIndicesElements(jsValue: JsLookupResult): Seq[Index] = for {
- jsObj <- jsValue.as[Seq[JsValue]]
- indexName = (jsObj \ "name").as[String]
- propNames = (jsObj \ "propNames").as[Seq[String]]
- } yield Index(indexName, propNames)
+ def toIndicesElements(jsValue: JsLookupResult): Seq[Index] = {
+ val indices = for {
+ jsObj <- jsValue.as[Seq[JsValue]]
+ indexName = (jsObj \ "name").as[String]
+ propNames = (jsObj \ "propNames").as[Seq[String]]
+ direction = (jsObj \ "direction").asOpt[String].map(GraphUtil.toDirection)
+ options = (jsObj \ "options").asOpt[JsValue].map(_.toString)
+ } yield {
+ Index(indexName, propNames, direction, options)
+ }
+
+ val (pk, others) = indices.partition(index => index.name == LabelIndex.DefaultName)
+ val (both, inOut) = others.partition(index => index.direction.isEmpty)
+ val (in, out) = inOut.partition(index => index.direction.get == GraphUtil.directions("in"))
+
+ pk ++ both ++ in ++ out
+ }
def toLabelElements(jsValue: JsValue) = Try {
val labelName = parse[String](jsValue, "label")
@@ -642,8 +654,8 @@ class RequestParser(graph: Graph) {
val options = (jsValue \ "options").asOpt[JsValue].map(_.toString())
(labelName, srcServiceName, srcColumnName, srcColumnType,
- tgtServiceName, tgtColumnName, tgtColumnType, isDirected, serviceName,
- indices, allProps, consistencyLevel, hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm, options)
+ tgtServiceName, tgtColumnName, tgtColumnType, isDirected, serviceName,
+ indices, allProps, consistencyLevel, hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm, options)
}
def toIndexElements(jsValue: JsValue) = Try {
@@ -703,6 +715,16 @@ class RequestParser(graph: Graph) {
(labels, direction, ids, ts, vertices)
}
+ def toFetchAndDeleteParam(json: JsValue) = {
+ val labelName = (json \ "label").as[String]
+ val fromOpt = (json \ "from").asOpt[JsValue]
+ val toOpt = (json \ "to").asOpt[JsValue]
+ val direction = (json \ "direction").asOpt[String].getOrElse("out")
+ val indexOpt = (json \ "index").asOpt[String]
+ val propsOpt = (json \ "props").asOpt[JsObject]
+ (labelName, fromOpt, toOpt, direction, indexOpt, propsOpt)
+ }
+
def parseExperiment(jsQuery: JsValue): Seq[ExperimentParam] = jsQuery.as[Seq[JsObject]].map { obj =>
def _require(field: String) = throw new RuntimeException(s"${field} not found")
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f7154bac/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index 07e39aa..f2b07cd 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -1035,8 +1035,9 @@ abstract class Storage[Q, R](val graph: Graph,
def incrementsInOut(edgeMutate: EdgeMutate): (Seq[SKeyValue], Seq[SKeyValue]) = {
- def filterOutDegree(e: IndexEdge): Boolean = true
-
+ def filterOutDegree(e: IndexEdge): Boolean =
+ e.labelIndex.writeOption.fold(true)(_.storeDegree)
+
(edgeMutate.edgesToDelete.isEmpty, edgeMutate.edgesToInsert.isEmpty) match {
case (true, true) =>