You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/11/16 17:23:21 UTC

incubator-s2graph git commit: [S2GRAPH-123]: Support different index on out/in direction

Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 66bdf1bc0 -> f7154bac9


[S2GRAPH-123]: Support different index on out/in direction

JIRA:
  [S2GRAPH-123] https://issues.apache.org/jira/browse/S2GRAPH-123

Pull Request:
  Closes #98

Authors
  DO YUNG YOON: steamshon@apache.org


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/f7154bac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/f7154bac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/f7154bac

Branch: refs/heads/master
Commit: f7154bac9d18f7a7a16098587f69483a773cf2b6
Parents: 66bdf1b
Author: DO YUNG YOON <st...@apache.org>
Authored: Wed Nov 16 18:22:58 2016 +0100
Committer: DO YUNG YOON <st...@apache.org>
Committed: Wed Nov 16 18:22:58 2016 +0100

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../org/apache/s2graph/core/mysqls/schema.sql   | 21 ++---
 .../scala/org/apache/s2graph/core/Edge.scala    | 22 +++++-
 .../org/apache/s2graph/core/Management.scala    |  7 +-
 .../org/apache/s2graph/core/mysqls/Label.scala  |  4 +-
 .../apache/s2graph/core/mysqls/LabelIndex.scala | 81 ++++++++++++++++----
 .../s2graph/core/rest/RequestParser.scala       | 36 +++++++--
 .../apache/s2graph/core/storage/Storage.scala   |  5 +-
 8 files changed, 137 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f7154bac/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 01d1cc8..1c5c01e 100644
--- a/CHANGES
+++ b/CHANGES
@@ -44,6 +44,8 @@ Release 0.1.0 - unreleased
 		(Contributed by Hyunsung Jo<hy...@gmail.com>, committed by DOYUNG YOON).
 
     S2GRAPH-125: Add options field on Label model for controlling advanced options (Committed by DOYUNG YOON).
+   
+    S2GRAPH-123: Support different index on out/in direction (Committed by DOYUNG YOON).
 
   IMPROVEMENT
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f7154bac/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
----------------------------------------------------------------------
diff --git a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
index 822df6c..b5e09c9 100644
--- a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
+++ b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
@@ -136,15 +136,18 @@ ALTER TABLE label_metas ADD FOREIGN KEY(label_id) REFERENCES labels(id) ON DELET
 -- ----------------------------
 DROP TABLE IF EXISTS `label_indices`;
 CREATE TABLE `label_indices` (
-  `id` int(11) NOT NULL AUTO_INCREMENT,
-  `label_id` int(11) NOT NULL,
-  `name` varchar(64) NOT NULL DEFAULT '_PK',
-  `seq` tinyint(4) NOT NULL,
-  `meta_seqs` varchar(64) NOT NULL,
-  `formulars` varchar(255) DEFAULT NULL,
-  PRIMARY KEY (`id`),
-  UNIQUE KEY `ux_label_indices_label_id_seq` (`label_id`,`meta_seqs`),
-  UNIQUE KEY `ux_label_indices_label_id_name` (`label_id`,`name`)
+	`id` int(11) NOT NULL AUTO_INCREMENT,
+	`label_id` int(11) NOT NULL,
+	`name` varchar(64) NOT NULL DEFAULT '_PK',
+	`seq` tinyint(4) NOT NULL,
+	`meta_seqs` varchar(64) NOT NULL,
+	`formulars` varchar(255) DEFAULT NULL,
+	`dir` int DEFAULT NULL,
+	`options` text,
+	PRIMARY KEY (`id`),
+	UNIQUE KEY `ux_label_id_seq` (`label_id`,`meta_seqs`),
+	UNIQUE KEY `ux_label_id_name` (`label_id`,`name`),
+	UNIQUE KEY `ux_label_id_meta_seqs_dir` (`label_id`,`meta_seqs`,`dir`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
 
 ALTER TABLE label_indices ADD FOREIGN KEY(label_id) REFERENCES labels(id) ON DELETE CASCADE;

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f7154bac/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
index 657cfed..8a2784d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
@@ -448,6 +448,24 @@ object Edge {
     }
   }
 
+  def filterOutWithLabelOption(ls: Seq[IndexEdge]): Seq[IndexEdge] = ls.filter { ie =>
+    ie.labelIndex.dir match {
+      case None =>
+        // both direction use same indices that is defined when label creation.
+        true
+      case Some(dir) =>
+        if (dir != ie.direction) {
+          // current labelIndex's direction is different with indexEdge's direction so don't touch
+          false
+        } else {
+          ie.labelIndex.writeOption.map { option =>
+            val hashValueOpt = ie.orders.find { case (k, v) => k == LabelMeta.fromHash }.map{ case (k, v) => v.value.toString.toLong }
+            option.sample(ie, hashValueOpt)
+          }.getOrElse(true)
+        }
+    }
+  }
+
   def buildMutation(snapshotEdgeOpt: Option[Edge],
                     requestEdge: Edge,
                     newVersion: Long,
@@ -477,7 +495,7 @@ object Edge {
         val edgesToDelete = snapshotEdgeOpt match {
           case Some(snapshotEdge) if snapshotEdge.op != GraphUtil.operations("delete") =>
             snapshotEdge.copy(op = GraphUtil.defaultOpByte)
-              .relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
+              .relatedEdges.flatMap { relEdge => filterOutWithLabelOption(relEdge.edgesWithIndexValid) }
           case _ => Nil
         }
 
@@ -488,7 +506,7 @@ object Edge {
               version = newVersion,
               propsWithTs = newPropsWithTs,
               op = GraphUtil.defaultOpByte
-            ).relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
+            ).relatedEdges.flatMap { relEdge => filterOutWithLabelOption(relEdge.edgesWithIndexValid) }
 
         EdgeMutate(edgesToDelete = edgesToDelete,
           edgesToInsert = edgesToInsert,

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f7154bac/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index 89acc63..60900be 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -43,8 +43,7 @@ object Management {
 
     object Prop extends ((String, String, String) => Prop)
 
-    case class Index(name: String, propNames: Seq[String])
-
+    case class Index(name: String, propNames: Seq[String], direction: Option[Int] = None, options: Option[String] = None)
   }
 
   import HBaseType._
@@ -135,7 +134,7 @@ object Management {
 
       indices.foreach { index =>
         val metaSeq = index.propNames.map { name => labelMetaMap(name).seq }
-        LabelIndex.findOrInsert(label.id.get, index.name, metaSeq.toList, "none")
+        LabelIndex.findOrInsert(label.id.get, index.name, metaSeq.toList, "none", index.direction, index.options)
       }
 
       label
@@ -340,7 +339,7 @@ class Management(graph: Graph) {
     val old = Label.findByName(oldLabelName, useCache = false).getOrElse(throw new LabelNotExistException(s"Old label $oldLabelName not exists."))
 
     val allProps = old.metas(useCache = false).map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) }
-    val allIndices = old.indices(useCache = false).map { index => Index(index.name, index.propNames) }
+    val allIndices = old.indices(useCache = false).map { index => Index(index.name, index.propNames, index.dir, index.options) }
 
     createLabel(newLabelName, old.srcService.serviceName, old.srcColumnName, old.srcColumnType,
       old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType,

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f7154bac/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
index fdef677..09d15d7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
@@ -211,11 +211,11 @@ object Label extends Model[Label] {
 
           if (indices.isEmpty) {
             // make default index with _PK, _timestamp, 0
-            LabelIndex.findOrInsert(createdId, LabelIndex.DefaultName, LabelIndex.DefaultMetaSeqs.toList, "none")
+            LabelIndex.findOrInsert(createdId, LabelIndex.DefaultName, LabelIndex.DefaultMetaSeqs.toList, "none", None, None)
           } else {
             indices.foreach { index =>
               val metaSeq = index.propNames.map { name => labelMetaMap(name) }
-              LabelIndex.findOrInsert(createdId, index.name, metaSeq.toList, "none")
+              LabelIndex.findOrInsert(createdId, index.name, metaSeq.toList, "none", index.direction, index.options)
             }
           }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f7154bac/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
index c548868..7b1cd07 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
@@ -24,6 +24,7 @@ package org.apache.s2graph.core.mysqls
  */
 
 import org.apache.s2graph.core.GraphUtil
+import org.apache.s2graph.core.mysqls.LabelIndex.WriteOption
 import org.apache.s2graph.core.utils.logger
 import play.api.libs.json.{JsObject, JsString, Json}
 import scalikejdbc._
@@ -39,7 +40,30 @@ object LabelIndex extends Model[LabelIndex] {
       rs.string("meta_seqs").split(",").filter(_ != "").map(s => s.toByte).toList match {
         case metaSeqsList => metaSeqsList
       },
-      rs.string("formulars"))
+      rs.string("formulars"),
+      rs.intOpt("dir"),
+      rs.stringOpt("options")
+    )
+  }
+  object WriteOption {
+    val Default = WriteOption()
+  }
+  case class WriteOption(method: String = "default",
+                         rate: Double = 1.0,
+                         totalModular: Long = 100,
+                         storeDegree: Boolean = true) {
+
+    def sample[T](a: T, hashOpt: Option[Long]): Boolean = {
+      if (method == "drop") false
+      else if (method == "sample") {
+        if (scala.util.Random.nextDouble() < rate) true
+        else false
+      } else if (method == "hash_sample") {
+        val hash = hashOpt.getOrElse(throw new RuntimeException("hash_sample need _from_hash value"))
+        if ((hash.abs % totalModular) / totalModular.toDouble < rate) true
+        else false
+      } else true
+    }
   }
 
   def findById(id: Int)(implicit session: DBSession = AutoSession) = {
@@ -62,24 +86,27 @@ object LabelIndex extends Model[LabelIndex] {
     }
   }
 
-  def insert(labelId: Int, indexName: String, seq: Byte, metaSeqs: List[Byte], formulars: String)(implicit session: DBSession = AutoSession): Long = {
+  def insert(labelId: Int, indexName: String, seq: Byte, metaSeqs: List[Byte], formulars: String,
+             direction: Option[Int], options: Option[String])(implicit session: DBSession = AutoSession): Long = {
     sql"""
-    	insert into label_indices(label_id, name, seq, meta_seqs, formulars)
-    	values (${labelId}, ${indexName}, ${seq}, ${metaSeqs.mkString(",")}, ${formulars})
+    	insert into label_indices(label_id, name, seq, meta_seqs, formulars, dir, options)
+    	values (${labelId}, ${indexName}, ${seq}, ${metaSeqs.mkString(",")}, ${formulars}, ${direction}, ${options})
     """
       .updateAndReturnGeneratedKey.apply()
   }
 
-  def findOrInsert(labelId: Int, indexName: String, metaSeqs: List[Byte], formulars: String)(implicit session: DBSession = AutoSession): LabelIndex = {
-    findByLabelIdAndSeqs(labelId, metaSeqs) match {
+  def findOrInsert(labelId: Int, indexName: String, metaSeqs: List[Byte], formulars: String,
+                   direction: Option[Int], options: Option[String])(implicit session: DBSession = AutoSession): LabelIndex = {
+    findByLabelIdAndSeqs(labelId, metaSeqs, direction) match {
       case Some(s) => s
       case None =>
         val orders = findByLabelIdAll(labelId, false)
         val seq = (orders.size + 1).toByte
         assert(seq <= MaxOrderSeq)
-        val createdId = insert(labelId, indexName, seq, metaSeqs, formulars)
+        val createdId = insert(labelId, indexName, seq, metaSeqs, formulars, direction, options)
         val cacheKeys = List(s"labelId=$labelId:seq=$seq",
-          s"labelId=$labelId:seqs=$metaSeqs", s"labelId=$labelId:seq=$seq", s"id=$createdId")
+          s"labelId=$labelId:seqs=$metaSeqs:dir=$direction", s"labelId=$labelId:seq=$seq", s"id=$createdId")
+
         cacheKeys.foreach { key =>
           expireCache(key)
           expireCaches(key)
@@ -89,11 +116,11 @@ object LabelIndex extends Model[LabelIndex] {
     }
   }
 
-  def findByLabelIdAndSeqs(labelId: Int, seqs: List[Byte])(implicit session: DBSession = AutoSession): Option[LabelIndex] = {
-    val cacheKey = "labelId=" + labelId + ":seqs=" + seqs.mkString(",")
+  def findByLabelIdAndSeqs(labelId: Int, seqs: List[Byte], direction: Option[Int])(implicit session: DBSession = AutoSession): Option[LabelIndex] = {
+    val cacheKey = "labelId=" + labelId + ":seqs=" + seqs.mkString(",") + ":dir=" + direction
     withCache(cacheKey) {
       sql"""
-      select * from label_indices where label_id = ${labelId} and meta_seqs = ${seqs.mkString(",")}
+      select * from label_indices where label_id = ${labelId} and meta_seqs = ${seqs.mkString(",")} and dir = ${direction}
       """.map { rs => LabelIndex(rs) }.single.apply
     }
   }
@@ -118,7 +145,7 @@ object LabelIndex extends Model[LabelIndex] {
     val (labelId, seq) = (labelIndex.labelId, labelIndex.seq)
     sql"""delete from label_indices where id = ${id}""".execute.apply()
 
-    val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:seq=$seq", s"labelId=$labelId:seqs=$seqs")
+    val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:seq=$seq", s"labelId=$labelId:seqs=$seqs:dir=${labelIndex.dir}")
     cacheKeys.foreach { key =>
       expireCache(key)
       expireCaches(key)
@@ -136,7 +163,7 @@ object LabelIndex extends Model[LabelIndex] {
       (cacheKey -> x)
     })
     putsToCache(ls.map { x =>
-      var cacheKey = s"labelId=${x.labelId}:seqs=${x.metaSeqs.mkString(",")}"
+      var cacheKey = s"labelId=${x.labelId}:seqs=${x.metaSeqs.mkString(",")}:dir=${x.dir}"
       (cacheKey -> x)
     })
     putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) =>
@@ -146,14 +173,38 @@ object LabelIndex extends Model[LabelIndex] {
   }
 }
 
-case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, metaSeqs: Seq[Byte], formulars: String) {
+case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, metaSeqs: Seq[Byte], formulars: String,
+                      dir: Option[Int], options: Option[String]) {
+  // both
   lazy val label = Label.findById(labelId)
   lazy val metas = label.metaPropsMap
   lazy val sortKeyTypes = metaSeqs.flatMap(metaSeq => label.metaPropsMap.get(metaSeq))
   lazy val sortKeyTypesArray = sortKeyTypes.toArray
   lazy val propNames = sortKeyTypes.map { labelMeta => labelMeta.name }
+
+  val dirJs = dir.map(GraphUtil.fromDirection).getOrElse("both")
+  val optionsJs = try { options.map(Json.parse).getOrElse(Json.obj()) } catch { case e: Exception => Json.obj() }
   lazy val toJson = Json.obj(
     "name" -> name,
-    "propNames" -> sortKeyTypes.map(x => x.name)
+    "propNames" -> sortKeyTypes.map(x => x.name),
+    "dir" -> dirJs,
+    "options" -> optionsJs
   )
+
+  lazy val writeOption: Option[WriteOption] = try {
+    options.map { string =>
+      val jsObj = Json.parse(string)
+      val method = (jsObj \ "method").as[String]
+
+      val rate = (jsObj \ "rate").asOpt[Double].getOrElse(1.0)
+      val totalModular = (jsObj \ "totalModular").asOpt[Long].getOrElse(100L)
+      val storeDegree = (jsObj \ "degree").asOpt[Boolean].getOrElse(true)
+
+      WriteOption(method, rate, totalModular, storeDegree)
+    }
+  } catch {
+    case e: Exception =>
+      logger.error(s"Parse failed labelOption: ${this.label}", e)
+      None
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f7154bac/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index 5466a9a..8baf787 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -611,11 +611,23 @@ class RequestParser(graph: Graph) {
     Prop(propName, defaultValue, dataType)
   }
 
-  def toIndicesElements(jsValue: JsLookupResult): Seq[Index] = for {
-    jsObj <- jsValue.as[Seq[JsValue]]
-    indexName = (jsObj \ "name").as[String]
-    propNames = (jsObj \ "propNames").as[Seq[String]]
-  } yield Index(indexName, propNames)
+  def toIndicesElements(jsValue: JsLookupResult): Seq[Index] = {
+    val indices = for {
+      jsObj <- jsValue.as[Seq[JsValue]]
+      indexName = (jsObj \ "name").as[String]
+      propNames = (jsObj \ "propNames").as[Seq[String]]
+      direction = (jsObj \ "direction").asOpt[String].map(GraphUtil.toDirection)
+      options = (jsObj \ "options").asOpt[JsValue].map(_.toString)
+    } yield {
+      Index(indexName, propNames, direction, options)
+    }
+
+    val (pk, others) = indices.partition(index => index.name == LabelIndex.DefaultName)
+    val (both, inOut) = others.partition(index => index.direction.isEmpty)
+    val (in, out) = inOut.partition(index => index.direction.get == GraphUtil.directions("in"))
+
+    pk ++ both ++ in ++ out
+  }
 
   def toLabelElements(jsValue: JsValue) = Try {
     val labelName = parse[String](jsValue, "label")
@@ -642,8 +654,8 @@ class RequestParser(graph: Graph) {
     val options = (jsValue \ "options").asOpt[JsValue].map(_.toString())
 
     (labelName, srcServiceName, srcColumnName, srcColumnType,
-      tgtServiceName, tgtColumnName, tgtColumnType, isDirected, serviceName,
-      indices, allProps, consistencyLevel, hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm, options)
+        tgtServiceName, tgtColumnName, tgtColumnType, isDirected, serviceName,
+        indices, allProps, consistencyLevel, hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm, options)
   }
 
   def toIndexElements(jsValue: JsValue) = Try {
@@ -703,6 +715,16 @@ class RequestParser(graph: Graph) {
     (labels, direction, ids, ts, vertices)
   }
 
+  def toFetchAndDeleteParam(json: JsValue) = {
+    val labelName = (json \ "label").as[String]
+    val fromOpt = (json \ "from").asOpt[JsValue]
+    val toOpt = (json \ "to").asOpt[JsValue]
+    val direction = (json \ "direction").asOpt[String].getOrElse("out")
+    val indexOpt = (json \ "index").asOpt[String]
+    val propsOpt = (json \ "props").asOpt[JsObject]
+    (labelName, fromOpt, toOpt, direction, indexOpt, propsOpt)
+  }
+
   def parseExperiment(jsQuery: JsValue): Seq[ExperimentParam] = jsQuery.as[Seq[JsObject]].map { obj =>
     def _require(field: String) = throw new RuntimeException(s"${field} not found")
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f7154bac/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index 07e39aa..f2b07cd 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -1035,8 +1035,9 @@ abstract class Storage[Q, R](val graph: Graph,
 
   def incrementsInOut(edgeMutate: EdgeMutate): (Seq[SKeyValue], Seq[SKeyValue]) = {
 
-    def filterOutDegree(e: IndexEdge): Boolean = true
-
+    def filterOutDegree(e: IndexEdge): Boolean =
+      e.labelIndex.writeOption.fold(true)(_.storeDegree)
+      
     (edgeMutate.edgesToDelete.isEmpty, edgeMutate.edgesToInsert.isEmpty) match {
       case (true, true) =>