You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/05/14 12:29:50 UTC
[06/25] incubator-s2graph git commit: Abstract Mutator.
Abstract Mutator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/b69421b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/b69421b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/b69421b0
Branch: refs/heads/master
Commit: b69421b0b40083c5fd98b4f8139a41bf9a8ff55f
Parents: 6f9d852
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu May 3 14:27:48 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu May 3 14:27:48 2018 +0900
----------------------------------------------------------------------
s2core/build.sbt | 3 +-
.../scala/org/apache/s2graph/core/Mutator.scala | 22 +++
.../scala/org/apache/s2graph/core/S2Graph.scala | 26 ++-
.../org/apache/s2graph/core/S2GraphLike.scala | 4 +
.../apache/s2graph/core/TraversalHelper.scala | 4 +-
.../core/storage/DefaultOptimisticMutator.scala | 171 +++++++++++++++++
.../core/storage/OptimisticMutator.scala | 46 +++++
.../apache/s2graph/core/storage/Storage.scala | 33 ++--
.../s2graph/core/storage/StorageWritable.scala | 64 -------
.../storage/WriteWriteConflictResolver.scala | 2 +-
.../core/storage/hbase/AsynchbaseStorage.scala | 3 +-
.../hbase/AsynchbaseStorageWritable.scala | 11 +-
.../core/storage/rocks/RocksStorage.scala | 11 +-
.../storage/rocks/RocksStorageWritable.scala | 10 +-
.../core/storage/serde/MutationHelper.scala | 190 -------------------
15 files changed, 302 insertions(+), 298 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/build.sbt
----------------------------------------------------------------------
diff --git a/s2core/build.sbt b/s2core/build.sbt
index 6e062cc..0b83c3d 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -56,7 +56,8 @@ libraryDependencies ++= Seq(
"com.sksamuel.elastic4s" %% "elastic4s-http" % elastic4sVersion excludeLogging(),
"com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sVersion excludeLogging(),
"org.scala-lang.modules" %% "scala-pickling" % "0.10.1",
- "com.spotify" % "annoy" % "0.2.5"
+ "com.spotify" % "annoy" % "0.2.5",
+ "org.tensorflow" % "tensorflow" % tensorflowVersion
)
libraryDependencies := {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala
new file mode 100644
index 0000000..a97dcff
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala
@@ -0,0 +1,22 @@
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.storage.{MutateResponse, SKeyValue}
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait Mutator {
+ def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse]
+
+ def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]]
+
+ def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]]
+
+ def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]]
+
+ def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse]
+
+ def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
+ requestTs: Long,
+ retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean]
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 43ab92c..4b2274a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -260,6 +260,15 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
else getStorage(label).reader
}
+ override def getMutator(column: ServiceColumn): Mutator = {
+ getStorage(column.service).mutator
+ }
+
+ override def getMutator(label: Label): Mutator = {
+ getStorage(label).mutator
+ }
+
+ //TODO:
override def flushStorage(): Unit = {
storagePool.foreach { case (_, storage) =>
@@ -315,7 +324,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
def mutateVertices(storage: Storage)(zkQuorum: String, vertices: Seq[S2VertexLike],
withWait: Boolean = false): Future[Seq[MutateResponse]] = {
val futures = vertices.map { vertex =>
- storage.mutateVertex(zkQuorum, vertex, withWait)
+ getMutator(vertex.serviceColumn).mutateVertex(zkQuorum, vertex, withWait)
}
Future.sequence(futures)
}
@@ -342,12 +351,12 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.innerLabel.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) =>
val futures = edgeWithIdxs.groupBy(_._1.innerLabel).map { case (label, edgeGroup) =>
- val storage = getStorage(label)
+ val mutator = getMutator(label)
val edges = edgeGroup.map(_._1)
val idxs = edgeGroup.map(_._2)
/* multiple edges with weak consistency level will be processed as batch */
- storage.mutateWeakEdges(zkQuorum, edges, withWait)
+ mutator.mutateWeakEdges(zkQuorum, edges, withWait)
}
Future.sequence(futures)
}
@@ -360,9 +369,10 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => edge.innerLabel }.map { case (label, edgeGroup) =>
val edges = edgeGroup.map(_._1)
val idxs = edgeGroup.map(_._2)
- val storage = getStorage(label)
+ val mutator = getMutator(label)
val zkQuorum = label.hbaseZkAddr
- storage.mutateStrongEdges(zkQuorum, edges, withWait = true).map { rets =>
+
+ mutator.mutateStrongEdges(zkQuorum, edges, withWait = true).map { rets =>
idxs.zip(rets)
}
}
@@ -487,7 +497,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
override def incrementCounts(edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[MutateResponse]] = {
val edgesWithIdx = edges.zipWithIndex
val futures = edgesWithIdx.groupBy { case (e, idx) => e.innerLabel }.map { case (label, edgeGroup) =>
- getStorage(label).incrementCounts(label.hbaseZkAddr, edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2)))
+ getMutator(label).incrementCounts(label.hbaseZkAddr, edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2)))
}
Future.sequence(futures).map { ls =>
@@ -497,9 +507,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
override def updateDegree(edge: S2EdgeLike, degreeVal: Long = 0): Future[MutateResponse] = {
val label = edge.innerLabel
- val storage = getStorage(label)
+ val mutator = getMutator(label)
- storage.updateDegree(label.hbaseZkAddr, edge, degreeVal)
+ mutator.updateDegree(label.hbaseZkAddr, edge, degreeVal)
}
override def getVertex(vertexId: VertexId): Option[S2VertexLike] = {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
index fef0078..6ed78b0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
@@ -97,6 +97,10 @@ trait S2GraphLike extends Graph {
def getFetcher(label: Label): Fetcher
+ def getMutator(label: Label): Mutator
+
+ def getMutator(column: ServiceColumn): Mutator
+
def flushStorage(): Unit
def shutdown(modelDataDelete: Boolean = false): Unit
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
index 003a2d1..d19dd1f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
@@ -256,7 +256,7 @@ class TraversalHelper(graph: S2GraphLike) {
*/
graph.mutateEdges(edgesToDelete.map(_.edge), withWait = true).map(_.forall(_.isSuccess))
} else {
- graph.getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
+ graph.getMutator(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
}
case _ =>
@@ -264,7 +264,7 @@ class TraversalHelper(graph: S2GraphLike) {
* read: x
* write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices)
*/
- graph.getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
+ graph.getMutator(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
}
ret
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala
new file mode 100644
index 0000000..40f29c0
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala
@@ -0,0 +1,171 @@
+package org.apache.s2graph.core.storage
+
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.schema.LabelMeta
+import org.apache.s2graph.core.utils.logger
+
+import scala.concurrent.{ExecutionContext, Future}
+
+abstract class DefaultOptimisticMutator(graph: S2GraphLike,
+ serDe: StorageSerDe,
+ reader: StorageReadable) extends OptimisticMutator {
+ val fetcher = reader
+
+ lazy val io: StorageIO = new StorageIO(graph, serDe)
+ lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, this, reader)
+
+// private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
+// mutator.writeToStorage(cluster, kvs, withWait)
+
+ def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
+ requestTs: Long,
+ retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = {
+ if (stepInnerResult.isEmpty) Future.successful(true)
+ else {
+ val head = stepInnerResult.edgeWithScores.head
+ val zkQuorum = head.edge.innerLabel.hbaseZkAddr
+ val futures = for {
+ edgeWithScore <- stepInnerResult.edgeWithScores
+ } yield {
+ val edge = edgeWithScore.edge
+
+ val edgeSnapshot = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
+ val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
+
+ val edgeForward = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
+ val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge =>
+ serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
+ io.buildIncrementsAsync(indexEdge, -1L)
+ }
+
+ /* reverted direction */
+ val edgeRevert = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
+ val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
+ serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
+ io.buildIncrementsAsync(indexEdge, -1L)
+ }
+
+ val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
+
+ writeToStorage(zkQuorum, mutations, withWait = true)
+ }
+
+ Future.sequence(futures).map { rets => rets.forall(_.isSuccess) }
+ }
+ }
+
+ def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
+ if (vertex.op == GraphUtil.operations("delete")) {
+ writeToStorage(zkQuorum,
+ serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait)
+ } else if (vertex.op == GraphUtil.operations("deleteAll")) {
+ logger.info(s"deleteAll for vertex is truncated. $vertex")
+ Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time
+ } else {
+ writeToStorage(zkQuorum, io.buildPutsAll(vertex), withWait)
+ }
+ }
+
+ def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = {
+ val mutations = _edges.flatMap { edge =>
+ val (_, edgeUpdate) =
+ if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
+ else S2Edge.buildOperation(None, Seq(edge))
+
+ val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
+
+ if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
+ io.buildVertexPutsAsync(edge) ++ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
+ }
+
+ writeToStorage(zkQuorum, mutations, withWait).map { ret =>
+ _edges.zipWithIndex.map { case (edge, idx) =>
+ idx -> ret.isSuccess
+ }
+ }
+ }
+
+ def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
+ def mutateEdgesInner(edges: Seq[S2EdgeLike],
+ checkConsistency: Boolean,
+ withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
+ assert(edges.nonEmpty)
+ // TODO:: remove after code review: unreachable code
+ if (!checkConsistency) {
+
+ val futures = edges.map { edge =>
+ val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))
+
+ val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
+ val mutations =
+ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
+
+ if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
+
+ writeToStorage(zkQuorum, mutations, withWait)
+ }
+ Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) }
+ } else {
+ fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
+ conflictResolver.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_))
+ }
+ }
+ }
+
+ val edgeWithIdxs = _edges.zipWithIndex
+ val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
+ (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId)
+ } toSeq
+
+ val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
+ val edges = edgeGroup.map(_._1)
+ val idxs = edgeGroup.map(_._2)
+ // After deleteAll, process others
+ val mutateEdgeFutures = edges.toList match {
+ case head :: tail =>
+ val edgeFuture = mutateEdgesInner(edges, checkConsistency = true, withWait)
+
+ //TODO: decide what we will do on failure on vertex put
+ val puts = io.buildVertexPutsAsync(head)
+ val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait)
+ Seq(edgeFuture, vertexFuture)
+ case Nil => Nil
+ }
+
+ val composed = for {
+ // deleteRet <- Future.sequence(deleteAllFutures)
+ mutateRet <- Future.sequence(mutateEdgeFutures)
+ } yield mutateRet
+
+ composed.map(_.forall(_.isSuccess)).map { ret => idxs.map(idx => idx -> ret) }
+ }
+
+ Future.sequence(mutateEdges).map { squashedRets =>
+ squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2)
+ }
+ }
+
+ def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = {
+ val futures = for {
+ edge <- edges
+ } yield {
+ val kvs = for {
+ relEdge <- edge.relatedEdges
+ edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid)
+ } yield {
+ val countWithTs = edge.propertyValueInner(LabelMeta.count)
+ val countVal = countWithTs.innerVal.toString().toLong
+ io.buildIncrementsCountAsync(edgeWithIndex, countVal).head
+ }
+ writeToStorage(zkQuorum, kvs, withWait = withWait)
+ }
+
+ Future.sequence(futures)
+ }
+
+ def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = {
+ val kvs = io.buildDegreePuts(edge, degreeVal)
+
+ writeToStorage(zkQuorum, kvs, withWait = true)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticMutator.scala
new file mode 100644
index 0000000..f9681a9
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticMutator.scala
@@ -0,0 +1,46 @@
+package org.apache.s2graph.core.storage
+
+import org.apache.s2graph.core.Mutator
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait OptimisticMutator extends Mutator {
+ /**
+ * decide how to store given key values Seq[SKeyValue] into storage using storage's client.
+ * note that this should be return true on all success.
+ * we assumes that each storage implementation has client as member variable.
+ *
+ * @param cluster : where this key values should be stored.
+ * @param kvs : sequence of SKeyValue that need to be stored in storage.
+ * @param withWait : flag to control wait ack from storage.
+ * note that in AsynchbaseStorage(which support asynchronous operations), even with true,
+ * it never block thread, but rather submit work and notified by event loop when storage send ack back.
+ * @return ack message from storage.
+ */
+ def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse]
+
+ /**
+ * write requestKeyValue into storage if the current value in storage that is stored matches.
+ * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge.
+ *
+ * Most important thing is this have to be 'atomic' operation.
+ * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be
+ * either blocked or failed on write-write conflict case.
+ *
+ * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to
+ * prevent wrong data for read.
+ *
+ * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction,
+ * compareAndSet to synchronize.
+ *
+ * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'.
+ * for storage that does not support concurrency control, then storage implementation
+ * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues)
+ * and write(writeLock).
+ *
+ * @param requestKeyValue
+ * @param expectedOpt
+ * @return
+ */
+ def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index 6ad62b1..d2500a6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -22,7 +22,7 @@ package org.apache.s2graph.core.storage
import com.typesafe.config.Config
import org.apache.s2graph.core._
-import org.apache.s2graph.core.storage.serde.{Deserializable, MutationHelper}
+import org.apache.s2graph.core.storage.serde.Deserializable
import org.apache.s2graph.core.storage.serde.indexedge.tall.IndexEdgeDeserializable
import org.apache.s2graph.core.types._
@@ -33,9 +33,6 @@ abstract class Storage(val graph: S2GraphLike,
/* Storage backend specific resource management */
val management: StorageManagement
- /* Physically store given KeyValue into backend storage. */
- val mutator: StorageWritable
-
/*
* Given QueryRequest/Vertex/Edge, fetch KeyValue from storage
* then convert them into Edge/Vertex
@@ -50,6 +47,11 @@ abstract class Storage(val graph: S2GraphLike,
val serDe: StorageSerDe
/*
+ * Responsible to connect physical storage backend to store GraphElement(Edge/Vertex).
+ */
+ val mutator: Mutator
+
+ /*
* Common helper to translate SKeyValue to Edge/Vertex and vice versa.
* Note that it require storage backend specific implementation for serialize/deserialize.
*/
@@ -60,16 +62,9 @@ abstract class Storage(val graph: S2GraphLike,
* Note that it require storage backend specific implementations for
* all of StorageWritable, StorageReadable, StorageSerDe, StorageIO
*/
- lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, mutator, reader)
-
- lazy val mutationHelper: MutationHelper = new MutationHelper(this)
-
- /** Mutation **/
- def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
- mutator.writeToStorage(cluster, kvs, withWait)
+// lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, mutator, reader)
+// lazy val mutationHelper: MutationHelper = new MutationHelper(this)
- def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse] =
- mutator.writeLock(requestKeyValue, expectedOpt)
/** Fetch **/
def fetches(queryRequests: Seq[QueryRequest],
@@ -102,21 +97,21 @@ abstract class Storage(val graph: S2GraphLike,
def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
requestTs: Long,
retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] =
- mutationHelper.deleteAllFetchedEdgesAsyncOld(stepInnerResult, requestTs, retryNum)
+ mutator.deleteAllFetchedEdgesAsyncOld(stepInnerResult, requestTs, retryNum)
def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
- mutationHelper.mutateVertex(zkQuorum: String, vertex, withWait)
+ mutator.mutateVertex(zkQuorum: String, vertex, withWait)
def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] =
- mutationHelper.mutateStrongEdges(zkQuorum, _edges, withWait)
+ mutator.mutateStrongEdges(zkQuorum, _edges, withWait)
def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] =
- mutationHelper.mutateWeakEdges(zkQuorum, _edges, withWait)
+ mutator.mutateWeakEdges(zkQuorum, _edges, withWait)
def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] =
- mutationHelper.incrementCounts(zkQuorum, edges, withWait)
+ mutator.incrementCounts(zkQuorum, edges, withWait)
def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] =
- mutationHelper.updateDegree(zkQuorum, edge, degreeVal)
+ mutator.updateDegree(zkQuorum, edge, degreeVal)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala
deleted file mode 100644
index 80da3a9..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage
-
-import scala.concurrent.{ExecutionContext, Future}
-
-trait StorageWritable {
- /**
- * decide how to store given key values Seq[SKeyValue] into storage using storage's client.
- * note that this should be return true on all success.
- * we assumes that each storage implementation has client as member variable.
- *
- *
- * @param cluster: where this key values should be stored.
- * @param kvs: sequence of SKeyValue that need to be stored in storage.
- * @param withWait: flag to control wait ack from storage.
- * note that in AsynchbaseStorage(which support asynchronous operations), even with true,
- * it never block thread, but rather submit work and notified by event loop when storage send ack back.
- * @return ack message from storage.
- */
- def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse]
-
- /**
- * write requestKeyValue into storage if the current value in storage that is stored matches.
- * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge.
- *
- * Most important thing is this have to be 'atomic' operation.
- * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be
- * either blocked or failed on write-write conflict case.
- *
- * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to
- * prevent wrong data for read.
- *
- * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction,
- * compareAndSet to synchronize.
- *
- * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'.
- * for storage that does not support concurrency control, then storage implementation
- * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues)
- * and write(writeLock).
- * @param requestKeyValue
- * @param expectedOpt
- * @return
- */
- def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse]
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
index dcef1cc..bfc5bc6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
@@ -31,7 +31,7 @@ import scala.util.Random
class WriteWriteConflictResolver(graph: S2GraphLike,
serDe: StorageSerDe,
io: StorageIO,
- mutator: StorageWritable,
+ mutator: OptimisticMutator,
fetcher: StorageReadable) {
val BackoffTimeout = graph.BackoffTimeout
val MaxRetryNum = graph.MaxRetryNum
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index e233277..4be3767 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -151,10 +151,9 @@ class AsynchbaseStorage(override val graph: S2GraphLike,
override val management: StorageManagement = new AsynchbaseStorageManagement(config, clients)
- override val mutator: StorageWritable = new AsynchbaseStorageWritable(client, clientWithFlush)
-
override val serDe: StorageSerDe = new AsynchbaseStorageSerDe(graph)
override val reader: StorageReadable = new AsynchbaseStorageReadable(graph, config, client, serDe, io)
+ override val mutator: Mutator = new AsynchbaseStorageWritable(graph, serDe, reader, client, clientWithFlush)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala
index 7ca3782..b4236b9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala
@@ -20,14 +20,19 @@
package org.apache.s2graph.core.storage.hbase
import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.storage.{IncrementResponse, MutateResponse, SKeyValue, StorageWritable}
+import org.apache.s2graph.core.S2GraphLike
+import org.apache.s2graph.core.storage._
import org.apache.s2graph.core.utils.{Extensions, logger}
import org.hbase.async.{AtomicIncrementRequest, DeleteRequest, HBaseClient, PutRequest}
+
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{ExecutionContext, Future}
-class AsynchbaseStorageWritable(val client: HBaseClient,
- val clientWithFlush: HBaseClient) extends StorageWritable {
+class AsynchbaseStorageWritable(val graph: S2GraphLike,
+ val serDe: StorageSerDe,
+ val reader: StorageReadable,
+ val client: HBaseClient,
+ val clientWithFlush: HBaseClient) extends DefaultOptimisticMutator(graph, serDe, reader) {
import Extensions.DeferOps
private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
index e53aeb3..b24e375 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
@@ -26,7 +26,7 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import com.google.common.hash.Hashing
import com.typesafe.config.Config
import org.apache.s2graph.core._
-import org.apache.s2graph.core.storage.Storage
+import org.apache.s2graph.core.storage.{Storage, StorageManagement, StorageReadable, StorageSerDe}
import org.apache.s2graph.core.storage.rocks.RocksHelper.RocksRPC
import org.apache.s2graph.core.utils.logger
import org.rocksdb._
@@ -150,11 +150,12 @@ class RocksStorage(override val graph: S2GraphLike,
.maximumSize(1000 * 10 * 10 * 10 * 10)
.build[String, ReentrantLock](cacheLoader)
- override val management = new RocksStorageManagement(config, vdb, db)
+ override val management: StorageManagement = new RocksStorageManagement(config, vdb, db)
- override val mutator = new RocksStorageWritable(db, vdb, lockMap)
+ override val serDe: StorageSerDe = new RocksStorageSerDe(graph)
- override val serDe = new RocksStorageSerDe(graph)
+ override val reader: StorageReadable = new RocksStorageReadable(graph, config, db, vdb, serDe, io)
+
+ override val mutator: Mutator = new RocksStorageWritable(graph, serDe, reader, db, vdb, lockMap)
- override val reader = new RocksStorageReadable(graph, config, db, vdb, serDe, io)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala
index 7ec147d..d29ccce 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala
@@ -23,15 +23,19 @@ import java.util.concurrent.locks.ReentrantLock
import com.google.common.cache.{Cache, LoadingCache}
import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.storage.{MutateResponse, SKeyValue, StorageWritable}
+import org.apache.s2graph.core.S2GraphLike
+import org.apache.s2graph.core.storage._
import org.apache.s2graph.core.utils.logger
import org.rocksdb.{RocksDB, RocksDBException, WriteBatch, WriteOptions}
import scala.concurrent.{ExecutionContext, Future}
-class RocksStorageWritable(val db: RocksDB,
+class RocksStorageWritable(val graph: S2GraphLike,
+ val serDe: StorageSerDe,
+ val reader: StorageReadable,
+ val db: RocksDB,
val vdb: RocksDB,
- val lockMap: LoadingCache[String, ReentrantLock]) extends StorageWritable {
+ val lockMap: LoadingCache[String, ReentrantLock]) extends DefaultOptimisticMutator(graph, serDe, reader) {
override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext) = {
if (kvs.isEmpty) {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
deleted file mode 100644
index fecc6ea..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage.serde
-
-import org.apache.s2graph.core.schema.LabelMeta
-import org.apache.s2graph.core._
-import org.apache.s2graph.core.storage._
-import org.apache.s2graph.core.utils.logger
-
-import scala.concurrent.{ExecutionContext, Future}
-
-class MutationHelper(storage: Storage) {
- val serDe = storage.serDe
- val io = storage.io
- val fetcher = storage.reader
- val mutator = storage.mutator
- val conflictResolver = storage.conflictResolver
-
- private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
- mutator.writeToStorage(cluster, kvs, withWait)
-
- def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
- requestTs: Long,
- retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = {
- if (stepInnerResult.isEmpty) Future.successful(true)
- else {
- val head = stepInnerResult.edgeWithScores.head
- val zkQuorum = head.edge.innerLabel.hbaseZkAddr
- val futures = for {
- edgeWithScore <- stepInnerResult.edgeWithScores
- } yield {
- val edge = edgeWithScore.edge
-
- val edgeSnapshot = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
- val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
-
- val edgeForward = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
- val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge =>
- serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
- io.buildIncrementsAsync(indexEdge, -1L)
- }
-
- /* reverted direction */
- val edgeRevert = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
- val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
- serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
- io.buildIncrementsAsync(indexEdge, -1L)
- }
-
- val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
-
- writeToStorage(zkQuorum, mutations, withWait = true)
- }
-
- Future.sequence(futures).map { rets => rets.forall(_.isSuccess) }
- }
- }
-
- def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
- if (vertex.op == GraphUtil.operations("delete")) {
- writeToStorage(zkQuorum,
- serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait)
- } else if (vertex.op == GraphUtil.operations("deleteAll")) {
- logger.info(s"deleteAll for vertex is truncated. $vertex")
- Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time
- } else {
- writeToStorage(zkQuorum, io.buildPutsAll(vertex), withWait)
- }
- }
-
- def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = {
- val mutations = _edges.flatMap { edge =>
- val (_, edgeUpdate) =
- if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
- else S2Edge.buildOperation(None, Seq(edge))
-
- val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
-
- if (bufferIncr.nonEmpty) storage.writeToStorage(zkQuorum, bufferIncr, withWait = false)
- io.buildVertexPutsAsync(edge) ++ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
- }
-
- writeToStorage(zkQuorum, mutations, withWait).map { ret =>
- _edges.zipWithIndex.map { case (edge, idx) =>
- idx -> ret.isSuccess
- }
- }
- }
-
- def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
- def mutateEdgesInner(edges: Seq[S2EdgeLike],
- checkConsistency: Boolean,
- withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
- assert(edges.nonEmpty)
- // TODO:: remove after code review: unreachable code
- if (!checkConsistency) {
-
- val futures = edges.map { edge =>
- val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))
-
- val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
- val mutations =
- io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
-
- if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
-
- writeToStorage(zkQuorum, mutations, withWait)
- }
- Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) }
- } else {
- fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
- conflictResolver.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_))
- }
- }
- }
-
- val edgeWithIdxs = _edges.zipWithIndex
- val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
- (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId)
- } toSeq
-
- val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
- val edges = edgeGroup.map(_._1)
- val idxs = edgeGroup.map(_._2)
- // After deleteAll, process others
- val mutateEdgeFutures = edges.toList match {
- case head :: tail =>
- val edgeFuture = mutateEdgesInner(edges, checkConsistency = true, withWait)
-
- //TODO: decide what we will do on failure on vertex put
- val puts = io.buildVertexPutsAsync(head)
- val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait)
- Seq(edgeFuture, vertexFuture)
- case Nil => Nil
- }
-
- val composed = for {
- // deleteRet <- Future.sequence(deleteAllFutures)
- mutateRet <- Future.sequence(mutateEdgeFutures)
- } yield mutateRet
-
- composed.map(_.forall(_.isSuccess)).map { ret => idxs.map(idx => idx -> ret) }
- }
-
- Future.sequence(mutateEdges).map { squashedRets =>
- squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2)
- }
- }
-
- def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = {
- val futures = for {
- edge <- edges
- } yield {
- val kvs = for {
- relEdge <- edge.relatedEdges
- edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid)
- } yield {
- val countWithTs = edge.propertyValueInner(LabelMeta.count)
- val countVal = countWithTs.innerVal.toString().toLong
- io.buildIncrementsCountAsync(edgeWithIndex, countVal).head
- }
- writeToStorage(zkQuorum, kvs, withWait = withWait)
- }
-
- Future.sequence(futures)
- }
-
- def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = {
- val kvs = io.buildDegreePuts(edge, degreeVal)
-
- mutator.writeToStorage(zkQuorum, kvs, withWait = true)
- }
-}