You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/12/28 01:20:50 UTC
[1/2] incubator-s2graph git commit: [S2GRAPH-132] add functionality
for buffering increment
Repository: incubator-s2graph
Updated Branches:
refs/heads/master 146094b50 -> c099da6fb
[S2GRAPH-132] add functionality for buffering increment
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/478db844
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/478db844
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/478db844
Branch: refs/heads/master
Commit: 478db844a0c7322f51c2e75f2fbee1bf4c492902
Parents: 20bdf92
Author: daewon <da...@apache.org>
Authored: Fri Dec 2 23:52:26 2016 +0900
Committer: daewon <da...@apache.org>
Committed: Fri Dec 2 23:52:26 2016 +0900
----------------------------------------------------------------------
.../scala/org/apache/s2graph/core/S2Edge.scala | 72 +++++++++++++++++++-
.../scala/org/apache/s2graph/core/S2Graph.scala | 5 +-
.../apache/s2graph/core/mysqls/LabelIndex.scala | 8 +--
.../apache/s2graph/core/storage/Storage.scala | 22 +++---
.../core/storage/hbase/AsynchbaseStorage.scala | 2 +-
5 files changed, 92 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/478db844/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
index 5c2a5dc..9408ed5 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
@@ -35,6 +35,30 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.{Map => MutableMap}
import scala.util.hashing.MurmurHash3
+object SnapshotEdge {
+
+ def copyFrom(e: SnapshotEdge): SnapshotEdge = {
+ val copy =
+ SnapshotEdge(
+ e.graph,
+ e.srcVertex,
+ e.tgtVertex,
+ e.label,
+ e.dir,
+ e.op,
+ e.version,
+ S2Edge.EmptyProps,
+ e.pendingEdgeOpt,
+ e.statusCode,
+ e.lockTs,
+ e.tsInnerValOpt)
+
+ copy.updatePropsWithTs(e.propsWithTs)
+
+ copy
+ }
+}
+
case class SnapshotEdge(graph: S2Graph,
srcVertex: S2Vertex,
tgtVertex: S2Vertex,
@@ -73,6 +97,18 @@ case class SnapshotEdge(graph: S2Graph,
jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
} yield meta.name -> jsValue) ++ Map("version" -> JsNumber(version))
+ def updatePropsWithTs(others: Props = S2Edge.EmptyProps): Props = {
+ if (others.isEmpty) propsWithTs
+ else {
+ val iter = others.entrySet().iterator()
+ while (iter.hasNext) {
+ val e = iter.next()
+ propsWithTs.put(e.getKey, e.getValue)
+ }
+ propsWithTs
+ }
+ }
+
// only for debug
def toLogString() = {
List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, propsWithName).mkString("\t")
@@ -104,6 +140,27 @@ case class SnapshotEdge(graph: S2Graph,
}
}
+object IndexEdge {
+ def copyFrom(e: IndexEdge): IndexEdge = {
+ val copy = IndexEdge(
+ e.graph,
+ e.srcVertex,
+ e.tgtVertex,
+ e.label,
+ e.dir,
+ e.op,
+ e.version,
+ e.labelIndexSeq,
+ S2Edge.EmptyProps,
+ e.tsInnerValOpt
+ )
+
+ copy.updatePropsWithTs(e.propsWithTs)
+
+ copy
+ }
+}
+
case class IndexEdge(graph: S2Graph,
srcVertex: S2Vertex,
tgtVertex: S2Vertex,
@@ -579,6 +636,11 @@ case class S2Edge(innerGraph: S2Graph,
object EdgeMutate {
+
+ def partitionBufferedIncrement(edges: Seq[IndexEdge]): (Seq[IndexEdge], Seq[IndexEdge]) = {
+ edges.partition(_.indexOption.fold(false)(_.isBufferIncrement))
+ }
+
def filterIndexOptionForDegree(edges: Seq[IndexEdge]): Seq[IndexEdge] = edges.filter { ie =>
ie.indexOption.fold(true)(_.storeDegree)
}
@@ -598,6 +660,12 @@ case class EdgeMutate(edgesToDelete: List[IndexEdge] = List.empty[IndexEdge],
edgesToInsert: List[IndexEdge] = List.empty[IndexEdge],
newSnapshotEdge: Option[SnapshotEdge] = None) {
+ def deepCopy: EdgeMutate = copy(
+ edgesToDelete = edgesToDelete.map(IndexEdge.copyFrom),
+ edgesToInsert = edgesToInsert.map(IndexEdge.copyFrom),
+ newSnapshotEdge = newSnapshotEdge.map(SnapshotEdge.copyFrom)
+ )
+
val edgesToInsertWithIndexOpt: Seq[IndexEdge] = EdgeMutate.filterIndexOption(edgesToInsert)
val edgesToDeleteWithIndexOpt: Seq[IndexEdge] = EdgeMutate.filterIndexOption(edgesToDelete)
@@ -821,9 +889,7 @@ object S2Edge {
}
- EdgeMutate(edgesToDelete = edgesToDelete,
- edgesToInsert = edgesToInsert,
- newSnapshotEdge = newSnapshotEdgeOpt)
+ EdgeMutate(edgesToDelete = edgesToDelete, edgesToInsert = edgesToInsert, newSnapshotEdge = newSnapshotEdgeOpt)
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/478db844/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index b3f3ac8..1b022f7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -991,7 +991,10 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
if (edge.op == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
else S2Edge.buildOperation(None, Seq(edge))
- storage.buildVertexPutsAsync(edge) ++ storage.indexedEdgeMutations(edgeUpdate) ++ storage.snapshotEdgeMutations(edgeUpdate) ++ storage.increments(edgeUpdate)
+ val (bufferIncr, nonBufferIncr) = storage.increments(edgeUpdate.deepCopy)
+
+ if (bufferIncr.nonEmpty) storage.writeToStorage(zkQuorum, bufferIncr, withWait = false)
+ storage.buildVertexPutsAsync(edge) ++ storage.indexedEdgeMutations(edgeUpdate.deepCopy) ++ storage.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
}
storage.writeToStorage(zkQuorum, mutations, withWait).map { ret =>
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/478db844/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
index fa61149..7d4d715 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.core.mysqls
import org.apache.s2graph.core.GraphUtil
-import org.apache.s2graph.core.mysqls.LabelIndex.WriteOption
+import org.apache.s2graph.core.mysqls.LabelIndex.indexOption
import org.apache.s2graph.core.utils.logger
import play.api.libs.json.{JsObject, JsString, Json}
import scalikejdbc._
@@ -42,7 +42,7 @@ object LabelIndex extends Model[LabelIndex] {
)
}
- case class WriteOption(dir: Byte,
+ case class indexOption(dir: Byte,
method: String,
rate: Double,
totalModular: Long,
@@ -191,7 +191,7 @@ case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, me
)
}
- def parseOption(dir: String): Option[WriteOption] = try {
+ def parseOption(dir: String): Option[indexOption] = try {
options.map { string =>
val jsObj = Json.parse(string) \ dir
@@ -200,7 +200,7 @@ case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, me
val totalModular = (jsObj \ "totalModular").asOpt[Long].getOrElse(100L)
val storeDegree = (jsObj \ "storeDegree").asOpt[Boolean].getOrElse(true)
- WriteOption(GraphUtil.directions(dir).toByte, method, rate, totalModular, storeDegree)
+ indexOption(GraphUtil.directions(dir).toByte, method, rate, totalModular, storeDegree)
}
} catch {
case e: Exception =>
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/478db844/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index 421bec3..f31c0ec 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -362,9 +362,11 @@ abstract class Storage[Q, R](val graph: S2Graph,
val futures = edges.map { edge =>
val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))
+ val (bufferIncr, nonBufferIncr) = increments(edgeUpdate.deepCopy)
val mutations =
- indexedEdgeMutations(edgeUpdate) ++ snapshotEdgeMutations(edgeUpdate) ++ increments(edgeUpdate)
+ indexedEdgeMutations(edgeUpdate.deepCopy) ++ snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
+ if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
writeToStorage(zkQuorum, mutations, withWait)
}
@@ -764,8 +766,10 @@ abstract class Storage[Q, R](val graph: S2Graph,
val p = Random.nextDouble()
if (p < FailProb) Future.failed(new PartialFailureException(squashedEdge, 2, s"$p"))
else {
- val incrs = increments(edgeMutate)
- _write(incrs, true)
+ val (bufferIncr, nonBufferIncr) = increments(edgeMutate.deepCopy)
+
+ if (bufferIncr.nonEmpty) _write(bufferIncr, withWait = false)
+ _write(nonBufferIncr, withWait = true)
}
}
}
@@ -1037,23 +1041,25 @@ abstract class Storage[Q, R](val graph: S2Graph,
def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] =
edgeMutate.newSnapshotEdge.map(e => snapshotEdgeSerializer(e).toKeyValues.map(_.copy(durability = e.label.durability))).getOrElse(Nil)
- def increments(edgeMutate: EdgeMutate): Seq[SKeyValue] = {
+ def increments(edgeMutate: EdgeMutate): (Seq[SKeyValue], Seq[SKeyValue]) = {
(edgeMutate.edgesToDeleteWithIndexOptForDegree.isEmpty, edgeMutate.edgesToInsertWithIndexOptForDegree.isEmpty) match {
case (true, true) =>
/** when there is no need to update. shouldUpdate == false */
- Nil
+ Nil -> Nil
case (true, false) =>
/** no edges to delete but there is new edges to insert so increase degree by 1 */
- edgeMutate.edgesToInsertWithIndexOptForDegree.flatMap(buildIncrementsAsync(_))
+ val (buffer, nonBuffer) = EdgeMutate.partitionBufferedIncrement(edgeMutate.edgesToInsertWithIndexOptForDegree)
+ buffer.flatMap(buildIncrementsAsync(_)) -> nonBuffer.flatMap(buildIncrementsAsync(_))
case (false, true) =>
/** no edges to insert but there is old edges to delete so decrease degree by 1 */
- edgeMutate.edgesToDeleteWithIndexOptForDegree.flatMap(buildIncrementsAsync(_, -1))
+ val (buffer, nonBuffer) = EdgeMutate.partitionBufferedIncrement(edgeMutate.edgesToDeleteWithIndexOptForDegree)
+ buffer.flatMap(buildIncrementsAsync(_, -1)) -> nonBuffer.flatMap(buildIncrementsAsync(_, -1))
case (false, false) =>
/** update on existing edges so no change on degree */
- Nil
+ Nil -> Nil
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/478db844/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 93b2454..5a9235e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -432,7 +432,7 @@ class AsynchbaseStorage(override val graph: S2Graph,
} yield {
val futures: List[Deferred[(Boolean, Long, Long)]] = for {
relEdge <- edge.relatedEdges
- edgeWithIndex <- relEdge.edgesWithIndexValid
+ edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid)
} yield {
val countWithTs = edge.propertyValueInner(LabelMeta.count)
val countVal = countWithTs.innerVal.toString().toLong
[2/2] incubator-s2graph git commit: [S2GRAPH-132] Support buffering
for 'Increment RPC'
Posted by st...@apache.org.
[S2GRAPH-132] Support buffering for 'Increment RPC'
JIRA:
[S2GRAPH-132] https://issues.apache.org/jira/browse/S2GRAPH-132
Pull Request:
Closes #105
Author
daewon <bl...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/c099da6f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/c099da6f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/c099da6f
Branch: refs/heads/master
Commit: c099da6fb988ba2b1f0f2081b5c807c978eaf041
Parents: 146094b 478db84
Author: DO YUNG YOON <st...@apache.org>
Authored: Wed Dec 28 10:21:44 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Wed Dec 28 10:22:06 2016 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../scala/org/apache/s2graph/core/S2Edge.scala | 72 +++++++++++++++++++-
.../scala/org/apache/s2graph/core/S2Graph.scala | 5 +-
.../apache/s2graph/core/mysqls/LabelIndex.scala | 8 +--
.../apache/s2graph/core/storage/Storage.scala | 22 +++---
.../core/storage/hbase/AsynchbaseStorage.scala | 2 +-
6 files changed, 94 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c099da6f/CHANGES
----------------------------------------------------------------------
diff --cc CHANGES
index 8328fec,860d2f5..4a0d5ce
--- a/CHANGES
+++ b/CHANGES
@@@ -99,8 -99,6 +99,10 @@@ Release 0.1.0 - unrelease
S2GRAPH-121: Create `Result` class to hold traverse result edges (Committed by DOYUNG YOON).
S2GRAPH-122: Change data types of Edge/IndexEdge/SnapshotEdge (Committed by DOYUNG YOON).
+
+ S2GRAPH-135: Change the way LabelIndexOption is implemented and improve it (Committed by daewon).
++
++ S2GRAPH-132: Support buffering for 'Increment RPC' (Committed by daewon).
BUG FIXES
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c099da6f/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
----------------------------------------------------------------------
diff --cc s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
index 1472786,9408ed5..7859218
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
@@@ -571,9 -634,13 +628,14 @@@ case class S2Edge(innerGraph: S2Graph
override def label(): String = innerLabel.label
}
+case class EdgeId(srcVertexId: InnerValLike, tgtVertexId: InnerValLike, labelName: String, direction: String)
object EdgeMutate {
+
+ def partitionBufferedIncrement(edges: Seq[IndexEdge]): (Seq[IndexEdge], Seq[IndexEdge]) = {
+ edges.partition(_.indexOption.fold(false)(_.isBufferIncrement))
+ }
+
def filterIndexOptionForDegree(edges: Seq[IndexEdge]): Seq[IndexEdge] = edges.filter { ie =>
ie.indexOption.fold(true)(_.storeDegree)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c099da6f/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c099da6f/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c099da6f/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------