You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/12/28 01:20:50 UTC

[1/2] incubator-s2graph git commit: [S2GRAPH-132] add functionality for buffering increment

Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 146094b50 -> c099da6fb


[S2GRAPH-132] add functionality for buffering increment


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/478db844
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/478db844
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/478db844

Branch: refs/heads/master
Commit: 478db844a0c7322f51c2e75f2fbee1bf4c492902
Parents: 20bdf92
Author: daewon <da...@apache.org>
Authored: Fri Dec 2 23:52:26 2016 +0900
Committer: daewon <da...@apache.org>
Committed: Fri Dec 2 23:52:26 2016 +0900

----------------------------------------------------------------------
 .../scala/org/apache/s2graph/core/S2Edge.scala  | 72 +++++++++++++++++++-
 .../scala/org/apache/s2graph/core/S2Graph.scala |  5 +-
 .../apache/s2graph/core/mysqls/LabelIndex.scala |  8 +--
 .../apache/s2graph/core/storage/Storage.scala   | 22 +++---
 .../core/storage/hbase/AsynchbaseStorage.scala  |  2 +-
 5 files changed, 92 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/478db844/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
index 5c2a5dc..9408ed5 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
@@ -35,6 +35,30 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.{Map => MutableMap}
 import scala.util.hashing.MurmurHash3
 
+object SnapshotEdge {
+
+  def copyFrom(e: SnapshotEdge): SnapshotEdge = {
+    val copy =
+      SnapshotEdge(
+        e.graph,
+        e.srcVertex,
+        e.tgtVertex,
+        e.label,
+        e.dir,
+        e.op,
+        e.version,
+        S2Edge.EmptyProps,
+        e.pendingEdgeOpt,
+        e.statusCode,
+        e.lockTs,
+        e.tsInnerValOpt)
+
+    copy.updatePropsWithTs(e.propsWithTs)
+
+    copy
+  }
+}
+
 case class SnapshotEdge(graph: S2Graph,
                         srcVertex: S2Vertex,
                         tgtVertex: S2Vertex,
@@ -73,6 +97,18 @@ case class SnapshotEdge(graph: S2Graph,
     jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
   } yield meta.name -> jsValue) ++ Map("version" -> JsNumber(version))
 
+  def updatePropsWithTs(others: Props = S2Edge.EmptyProps): Props = {
+    if (others.isEmpty) propsWithTs
+    else {
+      val iter = others.entrySet().iterator()
+      while (iter.hasNext) {
+        val e = iter.next()
+        propsWithTs.put(e.getKey, e.getValue)
+      }
+      propsWithTs
+    }
+  }
+
   // only for debug
   def toLogString() = {
     List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, propsWithName).mkString("\t")
@@ -104,6 +140,27 @@ case class SnapshotEdge(graph: S2Graph,
   }
 }
 
+object IndexEdge {
+  def copyFrom(e: IndexEdge): IndexEdge = {
+    val copy = IndexEdge(
+      e.graph,
+      e.srcVertex,
+      e.tgtVertex,
+      e.label,
+      e.dir,
+      e.op,
+      e.version,
+      e.labelIndexSeq,
+      S2Edge.EmptyProps,
+      e.tsInnerValOpt
+    )
+
+    copy.updatePropsWithTs(e.propsWithTs)
+
+    copy
+  }
+}
+
 case class IndexEdge(graph: S2Graph,
                      srcVertex: S2Vertex,
                      tgtVertex: S2Vertex,
@@ -579,6 +636,11 @@ case class S2Edge(innerGraph: S2Graph,
 
 
 object EdgeMutate {
+
+  def partitionBufferedIncrement(edges: Seq[IndexEdge]): (Seq[IndexEdge], Seq[IndexEdge]) = {
+    edges.partition(_.indexOption.fold(false)(_.isBufferIncrement))
+  }
+
   def filterIndexOptionForDegree(edges: Seq[IndexEdge]): Seq[IndexEdge] = edges.filter { ie =>
     ie.indexOption.fold(true)(_.storeDegree)
   }
@@ -598,6 +660,12 @@ case class EdgeMutate(edgesToDelete: List[IndexEdge] = List.empty[IndexEdge],
                       edgesToInsert: List[IndexEdge] = List.empty[IndexEdge],
                       newSnapshotEdge: Option[SnapshotEdge] = None) {
 
+  def deepCopy: EdgeMutate = copy(
+    edgesToDelete = edgesToDelete.map(IndexEdge.copyFrom),
+    edgesToInsert = edgesToInsert.map(IndexEdge.copyFrom),
+    newSnapshotEdge = newSnapshotEdge.map(SnapshotEdge.copyFrom)
+  )
+
   val edgesToInsertWithIndexOpt: Seq[IndexEdge] = EdgeMutate.filterIndexOption(edgesToInsert)
 
   val edgesToDeleteWithIndexOpt: Seq[IndexEdge] = EdgeMutate.filterIndexOption(edgesToDelete)
@@ -821,9 +889,7 @@ object S2Edge {
           }
 
 
-        EdgeMutate(edgesToDelete = edgesToDelete,
-          edgesToInsert = edgesToInsert,
-          newSnapshotEdge = newSnapshotEdgeOpt)
+        EdgeMutate(edgesToDelete = edgesToDelete, edgesToInsert = edgesToInsert, newSnapshotEdge = newSnapshotEdgeOpt)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/478db844/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index b3f3ac8..1b022f7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -991,7 +991,10 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
             if (edge.op == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
             else S2Edge.buildOperation(None, Seq(edge))
 
-          storage.buildVertexPutsAsync(edge) ++ storage.indexedEdgeMutations(edgeUpdate) ++ storage.snapshotEdgeMutations(edgeUpdate) ++ storage.increments(edgeUpdate)
+          val (bufferIncr, nonBufferIncr) = storage.increments(edgeUpdate.deepCopy)
+
+          if (bufferIncr.nonEmpty) storage.writeToStorage(zkQuorum, bufferIncr, withWait = false)
+          storage.buildVertexPutsAsync(edge) ++ storage.indexedEdgeMutations(edgeUpdate.deepCopy) ++ storage.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
         }
 
         storage.writeToStorage(zkQuorum, mutations, withWait).map { ret =>

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/478db844/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
index fa61149..7d4d715 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.mysqls
 
 import org.apache.s2graph.core.GraphUtil
-import org.apache.s2graph.core.mysqls.LabelIndex.WriteOption
+import org.apache.s2graph.core.mysqls.LabelIndex.indexOption
 import org.apache.s2graph.core.utils.logger
 import play.api.libs.json.{JsObject, JsString, Json}
 import scalikejdbc._
@@ -42,7 +42,7 @@ object LabelIndex extends Model[LabelIndex] {
     )
   }
 
-  case class WriteOption(dir: Byte,
+  case class indexOption(dir: Byte,
                          method: String,
                          rate: Double,
                          totalModular: Long,
@@ -191,7 +191,7 @@ case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, me
     )
   }
 
-  def parseOption(dir: String): Option[WriteOption] = try {
+  def parseOption(dir: String): Option[indexOption] = try {
     options.map { string =>
       val jsObj = Json.parse(string) \ dir
 
@@ -200,7 +200,7 @@ case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, me
       val totalModular = (jsObj \ "totalModular").asOpt[Long].getOrElse(100L)
       val storeDegree = (jsObj \ "storeDegree").asOpt[Boolean].getOrElse(true)
 
-      WriteOption(GraphUtil.directions(dir).toByte, method, rate, totalModular, storeDegree)
+      indexOption(GraphUtil.directions(dir).toByte, method, rate, totalModular, storeDegree)
     }
   } catch {
     case e: Exception =>

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/478db844/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index 421bec3..f31c0ec 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -362,9 +362,11 @@ abstract class Storage[Q, R](val graph: S2Graph,
       val futures = edges.map { edge =>
         val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))
 
+        val (bufferIncr, nonBufferIncr) = increments(edgeUpdate.deepCopy)
         val mutations =
-          indexedEdgeMutations(edgeUpdate) ++ snapshotEdgeMutations(edgeUpdate) ++ increments(edgeUpdate)
+          indexedEdgeMutations(edgeUpdate.deepCopy) ++ snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
 
+        if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
 
         writeToStorage(zkQuorum, mutations, withWait)
       }
@@ -764,8 +766,10 @@ abstract class Storage[Q, R](val graph: S2Graph,
       val p = Random.nextDouble()
       if (p < FailProb) Future.failed(new PartialFailureException(squashedEdge, 2, s"$p"))
       else {
-        val incrs = increments(edgeMutate)
-        _write(incrs, true)
+        val (bufferIncr, nonBufferIncr) = increments(edgeMutate.deepCopy)
+
+        if (bufferIncr.nonEmpty) _write(bufferIncr, withWait = false)
+        _write(nonBufferIncr, withWait = true)
       }
     }
   }
@@ -1037,23 +1041,25 @@ abstract class Storage[Q, R](val graph: S2Graph,
   def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] =
     edgeMutate.newSnapshotEdge.map(e => snapshotEdgeSerializer(e).toKeyValues.map(_.copy(durability = e.label.durability))).getOrElse(Nil)
 
-  def increments(edgeMutate: EdgeMutate): Seq[SKeyValue] = {
+  def increments(edgeMutate: EdgeMutate): (Seq[SKeyValue], Seq[SKeyValue]) = {
     (edgeMutate.edgesToDeleteWithIndexOptForDegree.isEmpty, edgeMutate.edgesToInsertWithIndexOptForDegree.isEmpty) match {
       case (true, true) =>
         /** when there is no need to update. shouldUpdate == false */
-        Nil
+        Nil -> Nil
 
       case (true, false) =>
         /** no edges to delete but there is new edges to insert so increase degree by 1 */
-        edgeMutate.edgesToInsertWithIndexOptForDegree.flatMap(buildIncrementsAsync(_))
+        val (buffer, nonBuffer) = EdgeMutate.partitionBufferedIncrement(edgeMutate.edgesToInsertWithIndexOptForDegree)
+        buffer.flatMap(buildIncrementsAsync(_)) -> nonBuffer.flatMap(buildIncrementsAsync(_))
 
       case (false, true) =>
         /** no edges to insert but there is old edges to delete so decrease degree by 1 */
-        edgeMutate.edgesToDeleteWithIndexOptForDegree.flatMap(buildIncrementsAsync(_, -1))
+        val (buffer, nonBuffer) = EdgeMutate.partitionBufferedIncrement(edgeMutate.edgesToDeleteWithIndexOptForDegree)
+        buffer.flatMap(buildIncrementsAsync(_, -1)) -> nonBuffer.flatMap(buildIncrementsAsync(_, -1))
 
       case (false, false) =>
         /** update on existing edges so no change on degree */
-        Nil
+        Nil -> Nil
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/478db844/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 93b2454..5a9235e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -432,7 +432,7 @@ class AsynchbaseStorage(override val graph: S2Graph,
     } yield {
         val futures: List[Deferred[(Boolean, Long, Long)]] = for {
           relEdge <- edge.relatedEdges
-          edgeWithIndex <- relEdge.edgesWithIndexValid
+          edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid)
         } yield {
           val countWithTs = edge.propertyValueInner(LabelMeta.count)
           val countVal = countWithTs.innerVal.toString().toLong


[2/2] incubator-s2graph git commit: [S2GRAPH-132] Support buffering for 'Increment RPC'

Posted by st...@apache.org.
[S2GRAPH-132] Support buffering for 'Increment RPC'

JIRA:
    [S2GRAPH-132] https://issues.apache.org/jira/browse/S2GRAPH-132

Pull Request:
    Closes #105

Author
    daewon <bl...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/c099da6f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/c099da6f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/c099da6f

Branch: refs/heads/master
Commit: c099da6fb988ba2b1f0f2081b5c807c978eaf041
Parents: 146094b 478db84
Author: DO YUNG YOON <st...@apache.org>
Authored: Wed Dec 28 10:21:44 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Wed Dec 28 10:22:06 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../scala/org/apache/s2graph/core/S2Edge.scala  | 72 +++++++++++++++++++-
 .../scala/org/apache/s2graph/core/S2Graph.scala |  5 +-
 .../apache/s2graph/core/mysqls/LabelIndex.scala |  8 +--
 .../apache/s2graph/core/storage/Storage.scala   | 22 +++---
 .../core/storage/hbase/AsynchbaseStorage.scala  |  2 +-
 6 files changed, 94 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c099da6f/CHANGES
----------------------------------------------------------------------
diff --cc CHANGES
index 8328fec,860d2f5..4a0d5ce
--- a/CHANGES
+++ b/CHANGES
@@@ -99,8 -99,6 +99,10 @@@ Release 0.1.0 - unrelease
      S2GRAPH-121: Create `Result` class to hold traverse result edges (Committed by DOYUNG YOON).
   
      S2GRAPH-122: Change data types of Edge/IndexEdge/SnapshotEdge (Committed by DOYUNG YOON).
 + 
 +    S2GRAPH-135: Change the way LabelIndexOption is implemented and improve it (Committed by daewon).
++
++    S2GRAPH-132: Support buffering for 'Increment RPC' (Committed by daewon).
  
    BUG FIXES
  

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c099da6f/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
----------------------------------------------------------------------
diff --cc s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
index 1472786,9408ed5..7859218
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
@@@ -571,9 -634,13 +628,14 @@@ case class S2Edge(innerGraph: S2Graph
    override def label(): String = innerLabel.label
  }
  
 +case class EdgeId(srcVertexId: InnerValLike, tgtVertexId: InnerValLike, labelName: String, direction: String)
  
  object EdgeMutate {
+ 
+   def partitionBufferedIncrement(edges: Seq[IndexEdge]): (Seq[IndexEdge], Seq[IndexEdge]) = {
+     edges.partition(_.indexOption.fold(false)(_.isBufferIncrement))
+   }
+ 
    def filterIndexOptionForDegree(edges: Seq[IndexEdge]): Seq[IndexEdge] = edges.filter { ie =>
      ie.indexOption.fold(true)(_.storeDegree)
    }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c099da6f/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c099da6f/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c099da6f/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------