You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/05/14 12:29:50 UTC

[06/25] incubator-s2graph git commit: Abstract Mutator.

Abstract Mutator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/b69421b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/b69421b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/b69421b0

Branch: refs/heads/master
Commit: b69421b0b40083c5fd98b4f8139a41bf9a8ff55f
Parents: 6f9d852
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu May 3 14:27:48 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu May 3 14:27:48 2018 +0900

----------------------------------------------------------------------
 s2core/build.sbt                                |   3 +-
 .../scala/org/apache/s2graph/core/Mutator.scala |  22 +++
 .../scala/org/apache/s2graph/core/S2Graph.scala |  26 ++-
 .../org/apache/s2graph/core/S2GraphLike.scala   |   4 +
 .../apache/s2graph/core/TraversalHelper.scala   |   4 +-
 .../core/storage/DefaultOptimisticMutator.scala | 171 +++++++++++++++++
 .../core/storage/OptimisticMutator.scala        |  46 +++++
 .../apache/s2graph/core/storage/Storage.scala   |  33 ++--
 .../s2graph/core/storage/StorageWritable.scala  |  64 -------
 .../storage/WriteWriteConflictResolver.scala    |   2 +-
 .../core/storage/hbase/AsynchbaseStorage.scala  |   3 +-
 .../hbase/AsynchbaseStorageWritable.scala       |  11 +-
 .../core/storage/rocks/RocksStorage.scala       |  11 +-
 .../storage/rocks/RocksStorageWritable.scala    |  10 +-
 .../core/storage/serde/MutationHelper.scala     | 190 -------------------
 15 files changed, 302 insertions(+), 298 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/build.sbt
----------------------------------------------------------------------
diff --git a/s2core/build.sbt b/s2core/build.sbt
index 6e062cc..0b83c3d 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -56,7 +56,8 @@ libraryDependencies ++= Seq(
   "com.sksamuel.elastic4s" %% "elastic4s-http" % elastic4sVersion excludeLogging(),
   "com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sVersion excludeLogging(),
   "org.scala-lang.modules" %% "scala-pickling" % "0.10.1",
-  "com.spotify" % "annoy" % "0.2.5"
+  "com.spotify" % "annoy" % "0.2.5",
+  "org.tensorflow" % "tensorflow" % tensorflowVersion
 )
 
 libraryDependencies := {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala
new file mode 100644
index 0000000..a97dcff
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala
@@ -0,0 +1,22 @@
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.storage.{MutateResponse, SKeyValue}
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait Mutator {
+  def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse]
+
+  def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]]
+
+  def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]]
+
+  def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]]
+
+  def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse]
+
+  def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
+                                    requestTs: Long,
+                                    retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean]
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 43ab92c..4b2274a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -260,6 +260,15 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
     else getStorage(label).reader
   }
 
+  override def getMutator(column: ServiceColumn): Mutator = {
+    getStorage(column.service).mutator
+  }
+
+  override def getMutator(label: Label): Mutator = {
+    getStorage(label).mutator
+  }
+
+  //TODO:
   override def flushStorage(): Unit = {
     storagePool.foreach { case (_, storage) =>
 
@@ -315,7 +324,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
     def mutateVertices(storage: Storage)(zkQuorum: String, vertices: Seq[S2VertexLike],
                                          withWait: Boolean = false): Future[Seq[MutateResponse]] = {
       val futures = vertices.map { vertex =>
-        storage.mutateVertex(zkQuorum, vertex, withWait)
+        getMutator(vertex.serviceColumn).mutateVertex(zkQuorum, vertex, withWait)
       }
       Future.sequence(futures)
     }
@@ -342,12 +351,12 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
 
     val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.innerLabel.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) =>
       val futures = edgeWithIdxs.groupBy(_._1.innerLabel).map { case (label, edgeGroup) =>
-        val storage = getStorage(label)
+        val mutator = getMutator(label)
         val edges = edgeGroup.map(_._1)
         val idxs = edgeGroup.map(_._2)
 
         /* multiple edges with weak consistency level will be processed as batch */
-        storage.mutateWeakEdges(zkQuorum, edges, withWait)
+        mutator.mutateWeakEdges(zkQuorum, edges, withWait)
       }
       Future.sequence(futures)
     }
@@ -360,9 +369,10 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
     val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => edge.innerLabel }.map { case (label, edgeGroup) =>
       val edges = edgeGroup.map(_._1)
       val idxs = edgeGroup.map(_._2)
-      val storage = getStorage(label)
+      val mutator = getMutator(label)
       val zkQuorum = label.hbaseZkAddr
-      storage.mutateStrongEdges(zkQuorum, edges, withWait = true).map { rets =>
+
+      mutator.mutateStrongEdges(zkQuorum, edges, withWait = true).map { rets =>
         idxs.zip(rets)
       }
     }
@@ -487,7 +497,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
   override def incrementCounts(edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[MutateResponse]] = {
     val edgesWithIdx = edges.zipWithIndex
     val futures = edgesWithIdx.groupBy { case (e, idx) => e.innerLabel }.map { case (label, edgeGroup) =>
-      getStorage(label).incrementCounts(label.hbaseZkAddr, edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2)))
+      getMutator(label).incrementCounts(label.hbaseZkAddr, edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2)))
     }
 
     Future.sequence(futures).map { ls =>
@@ -497,9 +507,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
 
   override def updateDegree(edge: S2EdgeLike, degreeVal: Long = 0): Future[MutateResponse] = {
     val label = edge.innerLabel
-    val storage = getStorage(label)
+    val mutator = getMutator(label)
 
-    storage.updateDegree(label.hbaseZkAddr, edge, degreeVal)
+    mutator.updateDegree(label.hbaseZkAddr, edge, degreeVal)
   }
 
   override def getVertex(vertexId: VertexId): Option[S2VertexLike] = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
index fef0078..6ed78b0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
@@ -97,6 +97,10 @@ trait S2GraphLike extends Graph {
 
   def getFetcher(label: Label): Fetcher
 
+  def getMutator(label: Label): Mutator
+
+  def getMutator(column: ServiceColumn): Mutator
+
   def flushStorage(): Unit
 
   def shutdown(modelDataDelete: Boolean = false): Unit

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
index 003a2d1..d19dd1f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
@@ -256,7 +256,7 @@ class TraversalHelper(graph: S2GraphLike) {
               */
             graph.mutateEdges(edgesToDelete.map(_.edge), withWait = true).map(_.forall(_.isSuccess))
           } else {
-            graph.getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
+            graph.getMutator(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
           }
         case _ =>
 
@@ -264,7 +264,7 @@ class TraversalHelper(graph: S2GraphLike) {
             * read: x
             * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices)
             */
-          graph.getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
+          graph.getMutator(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
       }
       ret
     }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala
new file mode 100644
index 0000000..40f29c0
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala
@@ -0,0 +1,171 @@
+package org.apache.s2graph.core.storage
+
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.schema.LabelMeta
+import org.apache.s2graph.core.utils.logger
+
+import scala.concurrent.{ExecutionContext, Future}
+
+abstract class DefaultOptimisticMutator(graph: S2GraphLike,
+                                        serDe: StorageSerDe,
+                                        reader: StorageReadable) extends OptimisticMutator {
+  val fetcher = reader
+
+  lazy val io: StorageIO = new StorageIO(graph, serDe)
+  lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, this, reader)
+
+//  private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
+//    mutator.writeToStorage(cluster, kvs, withWait)
+
+  def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
+                                    requestTs: Long,
+                                    retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = {
+    if (stepInnerResult.isEmpty) Future.successful(true)
+    else {
+      val head = stepInnerResult.edgeWithScores.head
+      val zkQuorum = head.edge.innerLabel.hbaseZkAddr
+      val futures = for {
+        edgeWithScore <- stepInnerResult.edgeWithScores
+      } yield {
+        val edge = edgeWithScore.edge
+
+        val edgeSnapshot = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
+        val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
+
+        val edgeForward = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
+        val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge =>
+          serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
+            io.buildIncrementsAsync(indexEdge, -1L)
+        }
+
+        /* reverted direction */
+        val edgeRevert = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
+        val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
+          serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
+            io.buildIncrementsAsync(indexEdge, -1L)
+        }
+
+        val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
+
+        writeToStorage(zkQuorum, mutations, withWait = true)
+      }
+
+      Future.sequence(futures).map { rets => rets.forall(_.isSuccess) }
+    }
+  }
+
+  def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
+    if (vertex.op == GraphUtil.operations("delete")) {
+      writeToStorage(zkQuorum,
+        serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait)
+    } else if (vertex.op == GraphUtil.operations("deleteAll")) {
+      logger.info(s"deleteAll for vertex is truncated. $vertex")
+      Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time
+    } else {
+      writeToStorage(zkQuorum, io.buildPutsAll(vertex), withWait)
+    }
+  }
+
+  def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = {
+    val mutations = _edges.flatMap { edge =>
+      val (_, edgeUpdate) =
+        if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
+        else S2Edge.buildOperation(None, Seq(edge))
+
+      val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
+
+      if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
+      io.buildVertexPutsAsync(edge) ++ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
+    }
+
+    writeToStorage(zkQuorum, mutations, withWait).map { ret =>
+      _edges.zipWithIndex.map { case (edge, idx) =>
+        idx -> ret.isSuccess
+      }
+    }
+  }
+
+  def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
+    def mutateEdgesInner(edges: Seq[S2EdgeLike],
+                         checkConsistency: Boolean,
+                         withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
+      assert(edges.nonEmpty)
+      // TODO:: remove after code review: unreachable code
+      if (!checkConsistency) {
+
+        val futures = edges.map { edge =>
+          val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))
+
+          val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
+          val mutations =
+            io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
+
+          if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
+
+          writeToStorage(zkQuorum, mutations, withWait)
+        }
+        Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) }
+      } else {
+        fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
+          conflictResolver.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_))
+        }
+      }
+    }
+
+    val edgeWithIdxs = _edges.zipWithIndex
+    val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
+      (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId)
+    } toSeq
+
+    val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
+      val edges = edgeGroup.map(_._1)
+      val idxs = edgeGroup.map(_._2)
+      // After deleteAll, process others
+      val mutateEdgeFutures = edges.toList match {
+        case head :: tail =>
+          val edgeFuture = mutateEdgesInner(edges, checkConsistency = true, withWait)
+
+          //TODO: decide what we will do on failure on vertex put
+          val puts = io.buildVertexPutsAsync(head)
+          val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait)
+          Seq(edgeFuture, vertexFuture)
+        case Nil => Nil
+      }
+
+      val composed = for {
+        //        deleteRet <- Future.sequence(deleteAllFutures)
+        mutateRet <- Future.sequence(mutateEdgeFutures)
+      } yield mutateRet
+
+      composed.map(_.forall(_.isSuccess)).map { ret => idxs.map(idx => idx -> ret) }
+    }
+
+    Future.sequence(mutateEdges).map { squashedRets =>
+      squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2)
+    }
+  }
+
+  def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = {
+    val futures = for {
+      edge <- edges
+    } yield {
+      val kvs = for {
+        relEdge <- edge.relatedEdges
+        edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid)
+      } yield {
+        val countWithTs = edge.propertyValueInner(LabelMeta.count)
+        val countVal = countWithTs.innerVal.toString().toLong
+        io.buildIncrementsCountAsync(edgeWithIndex, countVal).head
+      }
+      writeToStorage(zkQuorum, kvs, withWait = withWait)
+    }
+
+    Future.sequence(futures)
+  }
+
+  def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = {
+    val kvs = io.buildDegreePuts(edge, degreeVal)
+
+    writeToStorage(zkQuorum, kvs, withWait = true)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticMutator.scala
new file mode 100644
index 0000000..f9681a9
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticMutator.scala
@@ -0,0 +1,46 @@
+package org.apache.s2graph.core.storage
+
+import org.apache.s2graph.core.Mutator
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait OptimisticMutator extends Mutator {
+  /**
+    * decide how to store given key values Seq[SKeyValue] into storage using storage's client.
+    * note that this should be return true on all success.
+    * we assumes that each storage implementation has client as member variable.
+    *
+    * @param cluster  : where this key values should be stored.
+    * @param kvs      : sequence of SKeyValue that need to be stored in storage.
+    * @param withWait : flag to control wait ack from storage.
+    *                 note that in AsynchbaseStorage(which support asynchronous operations), even with true,
+    *                 it never block thread, but rather submit work and notified by event loop when storage send ack back.
+    * @return ack message from storage.
+    */
+  def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse]
+
+  /**
+    * write requestKeyValue into storage if the current value in storage that is stored matches.
+    * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge.
+    *
+    * Most important thing is this have to be 'atomic' operation.
+    * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be
+    * either blocked or failed on write-write conflict case.
+    *
+    * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to
+    * prevent wrong data for read.
+    *
+    * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction,
+    * compareAndSet to synchronize.
+    *
+    * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'.
+    * for storage that does not support concurrency control, then storage implementation
+    * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues)
+    * and write(writeLock).
+    *
+    * @param requestKeyValue
+    * @param expectedOpt
+    * @return
+    */
+  def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index 6ad62b1..d2500a6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -22,7 +22,7 @@ package org.apache.s2graph.core.storage
 
 import com.typesafe.config.Config
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.storage.serde.{Deserializable, MutationHelper}
+import org.apache.s2graph.core.storage.serde.Deserializable
 import org.apache.s2graph.core.storage.serde.indexedge.tall.IndexEdgeDeserializable
 import org.apache.s2graph.core.types._
 
@@ -33,9 +33,6 @@ abstract class Storage(val graph: S2GraphLike,
   /* Storage backend specific resource management */
   val management: StorageManagement
 
-  /* Physically store given KeyValue into backend storage. */
-  val mutator: StorageWritable
-
   /*
    * Given QueryRequest/Vertex/Edge, fetch KeyValue from storage
    * then convert them into Edge/Vertex
@@ -50,6 +47,11 @@ abstract class Storage(val graph: S2GraphLike,
   val serDe: StorageSerDe
 
   /*
+   * Responsible to connect physical storage backend to store GraphElement(Edge/Vertex).
+   */
+  val mutator: Mutator
+
+  /*
    * Common helper to translate SKeyValue to Edge/Vertex and vice versa.
    * Note that it require storage backend specific implementation for serialize/deserialize.
    */
@@ -60,16 +62,9 @@ abstract class Storage(val graph: S2GraphLike,
    * Note that it require storage backend specific implementations for
    * all of StorageWritable, StorageReadable, StorageSerDe, StorageIO
    */
-  lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, mutator, reader)
-
-  lazy val mutationHelper: MutationHelper = new MutationHelper(this)
-
-  /** Mutation **/
-  def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
-    mutator.writeToStorage(cluster, kvs, withWait)
+//  lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, mutator, reader)
+//  lazy val mutationHelper: MutationHelper = new MutationHelper(this)
 
-  def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse] =
-    mutator.writeLock(requestKeyValue, expectedOpt)
 
   /** Fetch **/
   def fetches(queryRequests: Seq[QueryRequest],
@@ -102,21 +97,21 @@ abstract class Storage(val graph: S2GraphLike,
   def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
                                     requestTs: Long,
                                     retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] =
-    mutationHelper.deleteAllFetchedEdgesAsyncOld(stepInnerResult, requestTs, retryNum)
+    mutator.deleteAllFetchedEdgesAsyncOld(stepInnerResult, requestTs, retryNum)
 
   def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
-    mutationHelper.mutateVertex(zkQuorum: String, vertex, withWait)
+    mutator.mutateVertex(zkQuorum: String, vertex, withWait)
 
   def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] =
-    mutationHelper.mutateStrongEdges(zkQuorum, _edges, withWait)
+    mutator.mutateStrongEdges(zkQuorum, _edges, withWait)
 
 
   def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] =
-    mutationHelper.mutateWeakEdges(zkQuorum, _edges, withWait)
+    mutator.mutateWeakEdges(zkQuorum, _edges, withWait)
 
   def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] =
-    mutationHelper.incrementCounts(zkQuorum, edges, withWait)
+    mutator.incrementCounts(zkQuorum, edges, withWait)
 
   def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] =
-    mutationHelper.updateDegree(zkQuorum, edge, degreeVal)
+    mutator.updateDegree(zkQuorum, edge, degreeVal)
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala
deleted file mode 100644
index 80da3a9..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage
-
-import scala.concurrent.{ExecutionContext, Future}
-
-trait StorageWritable {
-  /**
-    * decide how to store given key values Seq[SKeyValue] into storage using storage's client.
-    * note that this should be return true on all success.
-    * we assumes that each storage implementation has client as member variable.
-    *
-    *
-    * @param cluster: where this key values should be stored.
-    * @param kvs: sequence of SKeyValue that need to be stored in storage.
-    * @param withWait: flag to control wait ack from storage.
-    *                  note that in AsynchbaseStorage(which support asynchronous operations), even with true,
-    *                  it never block thread, but rather submit work and notified by event loop when storage send ack back.
-    * @return ack message from storage.
-    */
-  def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse]
-
-  /**
-    * write requestKeyValue into storage if the current value in storage that is stored matches.
-    * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge.
-    *
-    * Most important thing is this have to be 'atomic' operation.
-    * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be
-    * either blocked or failed on write-write conflict case.
-    *
-    * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to
-    * prevent wrong data for read.
-    *
-    * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction,
-    * compareAndSet to synchronize.
-    *
-    * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'.
-    * for storage that does not support concurrency control, then storage implementation
-    * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues)
-    * and write(writeLock).
-    * @param requestKeyValue
-    * @param expectedOpt
-    * @return
-    */
-  def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse]
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
index dcef1cc..bfc5bc6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
@@ -31,7 +31,7 @@ import scala.util.Random
 class WriteWriteConflictResolver(graph: S2GraphLike,
                                  serDe: StorageSerDe,
                                  io: StorageIO,
-                                 mutator: StorageWritable,
+                                 mutator: OptimisticMutator,
                                  fetcher: StorageReadable) {
   val BackoffTimeout = graph.BackoffTimeout
   val MaxRetryNum = graph.MaxRetryNum

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index e233277..4be3767 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -151,10 +151,9 @@ class AsynchbaseStorage(override val graph: S2GraphLike,
 
   override val management: StorageManagement = new AsynchbaseStorageManagement(config, clients)
 
-  override val mutator: StorageWritable = new AsynchbaseStorageWritable(client, clientWithFlush)
-
   override val serDe: StorageSerDe = new AsynchbaseStorageSerDe(graph)
 
   override val reader: StorageReadable = new AsynchbaseStorageReadable(graph, config, client, serDe, io)
 
+  override val mutator: Mutator = new AsynchbaseStorageWritable(graph, serDe, reader, client, clientWithFlush)
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala
index 7ca3782..b4236b9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala
@@ -20,14 +20,19 @@
 package org.apache.s2graph.core.storage.hbase
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.storage.{IncrementResponse, MutateResponse, SKeyValue, StorageWritable}
+import org.apache.s2graph.core.S2GraphLike
+import org.apache.s2graph.core.storage._
 import org.apache.s2graph.core.utils.{Extensions, logger}
 import org.hbase.async.{AtomicIncrementRequest, DeleteRequest, HBaseClient, PutRequest}
+
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.{ExecutionContext, Future}
 
-class AsynchbaseStorageWritable(val client: HBaseClient,
-                                val clientWithFlush: HBaseClient) extends StorageWritable {
+class AsynchbaseStorageWritable(val graph: S2GraphLike,
+                                val serDe: StorageSerDe,
+                                val reader: StorageReadable,
+                                val client: HBaseClient,
+                                val clientWithFlush: HBaseClient) extends DefaultOptimisticMutator(graph, serDe, reader) {
   import Extensions.DeferOps
 
   private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
index e53aeb3..b24e375 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
@@ -26,7 +26,7 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
 import com.google.common.hash.Hashing
 import com.typesafe.config.Config
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.storage.Storage
+import org.apache.s2graph.core.storage.{Storage, StorageManagement, StorageReadable, StorageSerDe}
 import org.apache.s2graph.core.storage.rocks.RocksHelper.RocksRPC
 import org.apache.s2graph.core.utils.logger
 import org.rocksdb._
@@ -150,11 +150,12 @@ class RocksStorage(override val graph: S2GraphLike,
     .maximumSize(1000 * 10 * 10 * 10 * 10)
     .build[String, ReentrantLock](cacheLoader)
 
-  override val management = new RocksStorageManagement(config, vdb, db)
+  override val management: StorageManagement = new RocksStorageManagement(config, vdb, db)
 
-  override val mutator = new RocksStorageWritable(db, vdb, lockMap)
+  override val serDe: StorageSerDe = new RocksStorageSerDe(graph)
 
-  override val serDe = new RocksStorageSerDe(graph)
+  override val reader: StorageReadable = new RocksStorageReadable(graph, config, db, vdb, serDe, io)
+
+  override val mutator: Mutator = new RocksStorageWritable(graph, serDe, reader, db, vdb, lockMap)
 
-  override val reader = new RocksStorageReadable(graph, config, db, vdb, serDe, io)
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala
index 7ec147d..d29ccce 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala
@@ -23,15 +23,19 @@ import java.util.concurrent.locks.ReentrantLock
 
 import com.google.common.cache.{Cache, LoadingCache}
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.storage.{MutateResponse, SKeyValue, StorageWritable}
+import org.apache.s2graph.core.S2GraphLike
+import org.apache.s2graph.core.storage._
 import org.apache.s2graph.core.utils.logger
 import org.rocksdb.{RocksDB, RocksDBException, WriteBatch, WriteOptions}
 
 import scala.concurrent.{ExecutionContext, Future}
 
-class RocksStorageWritable(val db: RocksDB,
+class RocksStorageWritable(val graph: S2GraphLike,
+                           val serDe: StorageSerDe,
+                           val reader: StorageReadable,
+                           val db: RocksDB,
                            val vdb: RocksDB,
-                           val lockMap: LoadingCache[String, ReentrantLock]) extends StorageWritable {
+                           val lockMap: LoadingCache[String, ReentrantLock]) extends DefaultOptimisticMutator(graph, serDe, reader) {
 
   override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext) = {
     if (kvs.isEmpty) {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
deleted file mode 100644
index fecc6ea..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage.serde
-
-import org.apache.s2graph.core.schema.LabelMeta
-import org.apache.s2graph.core._
-import org.apache.s2graph.core.storage._
-import org.apache.s2graph.core.utils.logger
-
-import scala.concurrent.{ExecutionContext, Future}
-
-class MutationHelper(storage: Storage) {
-  val serDe = storage.serDe
-  val io = storage.io
-  val fetcher = storage.reader
-  val mutator = storage.mutator
-  val conflictResolver = storage.conflictResolver
-
-  private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
-    mutator.writeToStorage(cluster, kvs, withWait)
-
-  def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
-                                    requestTs: Long,
-                                    retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = {
-    if (stepInnerResult.isEmpty) Future.successful(true)
-    else {
-      val head = stepInnerResult.edgeWithScores.head
-      val zkQuorum = head.edge.innerLabel.hbaseZkAddr
-      val futures = for {
-        edgeWithScore <- stepInnerResult.edgeWithScores
-      } yield {
-        val edge = edgeWithScore.edge
-
-        val edgeSnapshot = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
-        val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
-
-        val edgeForward = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
-        val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge =>
-          serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
-            io.buildIncrementsAsync(indexEdge, -1L)
-        }
-
-        /* reverted direction */
-        val edgeRevert = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
-        val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
-          serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
-            io.buildIncrementsAsync(indexEdge, -1L)
-        }
-
-        val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
-
-        writeToStorage(zkQuorum, mutations, withWait = true)
-      }
-
-      Future.sequence(futures).map { rets => rets.forall(_.isSuccess) }
-    }
-  }
-
-  def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
-    if (vertex.op == GraphUtil.operations("delete")) {
-      writeToStorage(zkQuorum,
-        serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait)
-    } else if (vertex.op == GraphUtil.operations("deleteAll")) {
-      logger.info(s"deleteAll for vertex is truncated. $vertex")
-      Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time
-    } else {
-      writeToStorage(zkQuorum, io.buildPutsAll(vertex), withWait)
-    }
-  }
-
-  def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = {
-    val mutations = _edges.flatMap { edge =>
-      val (_, edgeUpdate) =
-        if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
-        else S2Edge.buildOperation(None, Seq(edge))
-
-      val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
-
-      if (bufferIncr.nonEmpty) storage.writeToStorage(zkQuorum, bufferIncr, withWait = false)
-      io.buildVertexPutsAsync(edge) ++ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
-    }
-
-    writeToStorage(zkQuorum, mutations, withWait).map { ret =>
-      _edges.zipWithIndex.map { case (edge, idx) =>
-        idx -> ret.isSuccess
-      }
-    }
-  }
-
-  def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
-    def mutateEdgesInner(edges: Seq[S2EdgeLike],
-                         checkConsistency: Boolean,
-                         withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
-      assert(edges.nonEmpty)
-      // TODO:: remove after code review: unreachable code
-      if (!checkConsistency) {
-
-        val futures = edges.map { edge =>
-          val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))
-
-          val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
-          val mutations =
-            io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
-
-          if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
-
-          writeToStorage(zkQuorum, mutations, withWait)
-        }
-        Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) }
-      } else {
-        fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
-          conflictResolver.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_))
-        }
-      }
-    }
-
-    val edgeWithIdxs = _edges.zipWithIndex
-    val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
-      (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId)
-    } toSeq
-
-    val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
-      val edges = edgeGroup.map(_._1)
-      val idxs = edgeGroup.map(_._2)
-      // After deleteAll, process others
-      val mutateEdgeFutures = edges.toList match {
-        case head :: tail =>
-          val edgeFuture = mutateEdgesInner(edges, checkConsistency = true, withWait)
-
-          //TODO: decide what we will do on failure on vertex put
-          val puts = io.buildVertexPutsAsync(head)
-          val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait)
-          Seq(edgeFuture, vertexFuture)
-        case Nil => Nil
-      }
-
-      val composed = for {
-      //        deleteRet <- Future.sequence(deleteAllFutures)
-        mutateRet <- Future.sequence(mutateEdgeFutures)
-      } yield mutateRet
-
-      composed.map(_.forall(_.isSuccess)).map { ret => idxs.map(idx => idx -> ret) }
-    }
-
-    Future.sequence(mutateEdges).map { squashedRets =>
-      squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2)
-    }
-  }
-
-  def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = {
-    val futures = for {
-      edge <- edges
-    } yield {
-      val kvs = for {
-        relEdge <- edge.relatedEdges
-        edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid)
-      } yield {
-        val countWithTs = edge.propertyValueInner(LabelMeta.count)
-        val countVal = countWithTs.innerVal.toString().toLong
-        io.buildIncrementsCountAsync(edgeWithIndex, countVal).head
-      }
-      writeToStorage(zkQuorum, kvs, withWait = withWait)
-    }
-
-    Future.sequence(futures)
-  }
-
-  def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = {
-    val kvs = io.buildDegreePuts(edge, degreeVal)
-
-    mutator.writeToStorage(zkQuorum, kvs, withWait = true)
-  }
-}