You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/05/11 03:05:44 UTC

[04/11] incubator-s2graph git commit: separate Storage into multiple small interfaces such as EdgeFetcher/VertexMutator, ...

separate Storage into multiple small interfaces such as EdgeFetcher/VertexMutator, ...


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/43f627e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/43f627e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/43f627e5

Branch: refs/heads/master
Commit: 43f627e551fd0b744a858c9e6e7feba7fd68e58c
Parents: 2357d81
Author: DO YUNG YOON <st...@apache.org>
Authored: Wed May 9 15:58:19 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Wed May 9 16:56:37 2018 +0900

----------------------------------------------------------------------
 .../apache/s2graph/core/EdgeBulkFetcher.scala   |  28 ++
 .../org/apache/s2graph/core/EdgeFetcher.scala   |  35 ++
 .../org/apache/s2graph/core/EdgeMutator.scala   |  38 ++
 .../scala/org/apache/s2graph/core/Fetcher.scala |  36 --
 .../org/apache/s2graph/core/Management.scala    |   4 +-
 .../scala/org/apache/s2graph/core/Mutator.scala |  41 ---
 .../scala/org/apache/s2graph/core/S2Graph.scala |  51 ++-
 .../org/apache/s2graph/core/S2GraphLike.scala   |  25 +-
 .../apache/s2graph/core/TraversalHelper.scala   |   6 +-
 .../apache/s2graph/core/VertexBulkFetcher.scala |  26 ++
 .../org/apache/s2graph/core/VertexFetcher.scala |  31 ++
 .../org/apache/s2graph/core/VertexMutator.scala |  28 ++
 .../s2graph/core/fetcher/FetcherManager.scala   | 106 ++++++
 .../core/fetcher/MemoryModelEdgeFetcher.scala   |  54 +++
 .../s2graph/core/model/ImportStatus.scala       |  59 ---
 .../apache/s2graph/core/model/Importer.scala    | 122 ------
 .../s2graph/core/model/MemoryModelFetcher.scala |  59 ---
 .../s2graph/core/model/ModelManager.scala       | 103 ------
 .../core/storage/DefaultOptimisticMutator.scala | 190 ++++++++++
 .../core/storage/OptimisticEdgeFetcher.scala    |  56 +++
 .../core/storage/OptimisticMutator.scala        |  63 ++++
 .../apache/s2graph/core/storage/Storage.scala   |  72 +---
 .../s2graph/core/storage/StorageReadable.scala  |  49 +--
 .../s2graph/core/storage/StorageWritable.scala  |  65 ----
 .../storage/WriteWriteConflictResolver.scala    |   6 +-
 .../hbase/AsynchbaseEdgeBulkFetcher.scala       |  69 ++++
 .../storage/hbase/AsynchbaseEdgeFetcher.scala   | 120 ++++++
 .../hbase/AsynchbaseOptimisticEdgeFetcher.scala |  35 ++
 .../hbase/AsynchbaseOptimisticMutator.scala     | 142 +++++++
 .../core/storage/hbase/AsynchbaseStorage.scala  | 188 +++++++++-
 .../hbase/AsynchbaseStorageReadable.scala       | 367 -------------------
 .../hbase/AsynchbaseStorageWritable.scala       | 142 -------
 .../hbase/AsynchbaseVertexBulkFetcher.scala     |  63 ++++
 .../storage/hbase/AsynchbaseVertexFetcher.scala |  61 +++
 .../storage/rocks/RocksEdgeBulkFetcher.scala    |  68 ++++
 .../core/storage/rocks/RocksEdgeFetcher.scala   |  60 +++
 .../rocks/RocksOptimisticEdgeFetcher.scala      |  41 +++
 .../storage/rocks/RocksOptimisticMutator.scala  | 133 +++++++
 .../core/storage/rocks/RocksStorage.scala       | 104 +++++-
 .../storage/rocks/RocksStorageReadable.scala    | 234 ------------
 .../storage/rocks/RocksStorageWritable.scala    | 133 -------
 .../storage/rocks/RocksVertexBulkFetcher.scala  |  88 +++++
 .../core/storage/rocks/RocksVertexFetcher.scala |  61 +++
 .../core/storage/serde/MutationHelper.scala     | 188 ----------
 .../s2graph/core/utils/ImportStatus.scala       |  59 +++
 .../apache/s2graph/core/utils/Importer.scala    | 122 ++++++
 .../s2graph/core/fetcher/EdgeFetcherTest.scala  |  87 +++++
 .../apache/s2graph/core/model/FetcherTest.scala |  87 -----
 48 files changed, 2244 insertions(+), 1761 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/EdgeBulkFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/EdgeBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/EdgeBulkFetcher.scala
new file mode 100644
index 0000000..646f5f4
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/EdgeBulkFetcher.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core
+
+import com.typesafe.config.Config
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait EdgeBulkFetcher {
+  def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]]
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala
new file mode 100644
index 0000000..f28a161
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.types.VertexId
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait EdgeFetcher {
+
+  def init(config: Config)(implicit ec: ExecutionContext): Unit = {}
+
+  def fetches(queryRequests: Seq[QueryRequest],
+              prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]]
+
+  def close(): Unit = {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala
new file mode 100644
index 0000000..dc0099e
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.storage.MutateResponse
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait EdgeMutator {
+  def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]]
+
+  def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]]
+
+  def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]]
+
+  def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse]
+
+  def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
+                                    requestTs: Long,
+                                    retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean]
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala
deleted file mode 100644
index 57d2f29..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core
-
-import com.typesafe.config.Config
-import org.apache.s2graph.core.types.VertexId
-
-import scala.concurrent.{ExecutionContext, Future}
-
-trait Fetcher {
-
-  def init(config: Config)(implicit ec: ExecutionContext): Future[Fetcher] =
-    Future.successful(this)
-
-  def fetches(queryRequests: Seq[QueryRequest],
-              prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]]
-
-  def close(): Unit = {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index 7ff5a9e..9046449 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -19,7 +19,7 @@
 
 package org.apache.s2graph.core
 
-import java.util
+
 import java.util.concurrent.Executors
 
 import com.typesafe.config.{Config, ConfigFactory}
@@ -29,7 +29,7 @@ import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.types.HBaseType._
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.JSONParser._
-import org.apache.s2graph.core.model.Importer
+import org.apache.s2graph.core.utils.Importer
 import play.api.libs.json._
 
 import scala.concurrent.{ExecutionContext, Future}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala
deleted file mode 100644
index 53161e1..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core
-
-import org.apache.s2graph.core.storage.{MutateResponse, SKeyValue}
-
-import scala.concurrent.{ExecutionContext, Future}
-
-trait Mutator {
-  def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse]
-
-  def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]]
-
-  def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]]
-
-  def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]]
-
-  def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse]
-
-  def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
-                                    requestTs: Long,
-                                    retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean]
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 4b2274a..c4cb48f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -27,11 +27,11 @@ import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.commons.configuration.{BaseConfiguration, Configuration}
 import org.apache.s2graph.core.index.IndexProvider
 import org.apache.s2graph.core.io.tinkerpop.optimize.S2GraphStepStrategy
-import org.apache.s2graph.core.model.ModelManager
+import org.apache.s2graph.core.fetcher.FetcherManager
 import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
 import org.apache.s2graph.core.storage.rocks.RocksStorage
-import org.apache.s2graph.core.storage.{MutateResponse, Storage}
+import org.apache.s2graph.core.storage.{MutateResponse, OptimisticEdgeFetcher, Storage}
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger}
 import org.apache.tinkerpop.gremlin.process.traversal.{P, TraversalStrategies}
@@ -187,7 +187,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
 
   override val management = new Management(this)
 
-  override val modelManager = new ModelManager(this)
+  override val modelManager = new FetcherManager(this)
 
   override val indexProvider = IndexProvider.apply(config)
 
@@ -251,21 +251,34 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
   }
 
   //TODO:
-  override def getFetcher(column: ServiceColumn): Fetcher = {
-    getStorage(column.service).reader
+  override def getVertexFetcher(column: ServiceColumn): VertexFetcher = {
+    getStorage(column.service).vertexFetcher
+  }
+  override def getVertexBulkFetcher: VertexBulkFetcher = {
+    defaultStorage.vertexBulkFetcher
   }
 
-  override def getFetcher(label: Label): Fetcher = {
+  override def getEdgeFetcher(label: Label): EdgeFetcher = {
     if (label.fetchConfigExist) modelManager.getFetcher(label)
-    else getStorage(label).reader
+    else getStorage(label).edgeFetcher
+  }
+
+  override def getEdgeBulkFetcher: EdgeBulkFetcher = {
+    defaultStorage.edgeBulkFetcher
   }
 
-  override def getMutator(column: ServiceColumn): Mutator = {
-    getStorage(column.service).mutator
+  override def getVertexMutator(column: ServiceColumn): VertexMutator = {
+    getStorage(column.service).vertexMutator
   }
 
-  override def getMutator(label: Label): Mutator = {
-    getStorage(label).mutator
+  override def getEdgeMutator(label: Label): EdgeMutator = {
+    getStorage(label).edgeMutator
+  }
+
+  /** optional */
+  override def getOptimisticEdgeFetcher(label: Label): OptimisticEdgeFetcher = {
+//    getStorage(label).optimisticEdgeFetcher
+    null
   }
 
   //TODO:
@@ -296,8 +309,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
 
   override def getVertices(vertices: Seq[S2VertexLike]): Future[Seq[S2VertexLike]] = {
     val verticesWithIdx = vertices.zipWithIndex
-    val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
-      getStorage(service).fetchVertices(vertices).map(_.zip(vertexGroup.map(_._2)))
+    val futures = verticesWithIdx.groupBy { case (v, idx) => v.serviceColumn }.map { case (serviceColumn, vertexGroup) =>
+      getVertexFetcher(serviceColumn).fetchVertices(vertices).map(_.zip(vertexGroup.map(_._2)))
     }
 
     Future.sequence(futures).map { ls =>
@@ -309,7 +322,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
     val futures = for {
       edge <- edges
     } yield {
-      getStorage(edge.innerLabel).fetchSnapshotEdgeInner(edge).map { case (edgeOpt, _) =>
+      getOptimisticEdgeFetcher(edge.innerLabel).fetchSnapshotEdgeInner(edge).map { case (edgeOpt, _) =>
         edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0, edge.innerLabel))
       }
     }
@@ -324,7 +337,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
     def mutateVertices(storage: Storage)(zkQuorum: String, vertices: Seq[S2VertexLike],
                                          withWait: Boolean = false): Future[Seq[MutateResponse]] = {
       val futures = vertices.map { vertex =>
-        getMutator(vertex.serviceColumn).mutateVertex(zkQuorum, vertex, withWait)
+        getVertexMutator(vertex.serviceColumn).mutateVertex(zkQuorum, vertex, withWait)
       }
       Future.sequence(futures)
     }
@@ -351,7 +364,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
 
     val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.innerLabel.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) =>
       val futures = edgeWithIdxs.groupBy(_._1.innerLabel).map { case (label, edgeGroup) =>
-        val mutator = getMutator(label)
+        val mutator = getEdgeMutator(label)
         val edges = edgeGroup.map(_._1)
         val idxs = edgeGroup.map(_._2)
 
@@ -369,7 +382,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
     val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => edge.innerLabel }.map { case (label, edgeGroup) =>
       val edges = edgeGroup.map(_._1)
       val idxs = edgeGroup.map(_._2)
-      val mutator = getMutator(label)
+      val mutator = getEdgeMutator(label)
       val zkQuorum = label.hbaseZkAddr
 
       mutator.mutateStrongEdges(zkQuorum, edges, withWait = true).map { rets =>
@@ -497,7 +510,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
   override def incrementCounts(edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[MutateResponse]] = {
     val edgesWithIdx = edges.zipWithIndex
     val futures = edgesWithIdx.groupBy { case (e, idx) => e.innerLabel }.map { case (label, edgeGroup) =>
-      getMutator(label).incrementCounts(label.hbaseZkAddr, edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2)))
+      getEdgeMutator(label).incrementCounts(label.hbaseZkAddr, edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2)))
     }
 
     Future.sequence(futures).map { ls =>
@@ -507,7 +520,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
 
   override def updateDegree(edge: S2EdgeLike, degreeVal: Long = 0): Future[MutateResponse] = {
     val label = edge.innerLabel
-    val mutator = getMutator(label)
+    val mutator = getEdgeMutator(label)
 
     mutator.updateDegree(label.hbaseZkAddr, edge, degreeVal)
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
index 6ed78b0..5e2c168 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
@@ -31,9 +31,9 @@ import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
 import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName}
 import org.apache.s2graph.core.features.{S2Features, S2GraphVariables}
 import org.apache.s2graph.core.index.IndexProvider
-import org.apache.s2graph.core.model.ModelManager
+import org.apache.s2graph.core.fetcher.FetcherManager
 import org.apache.s2graph.core.schema.{Label, LabelMeta, Service, ServiceColumn}
-import org.apache.s2graph.core.storage.{MutateResponse, Storage}
+import org.apache.s2graph.core.storage.{MutateResponse, OptimisticEdgeFetcher, Storage}
 import org.apache.s2graph.core.types.{InnerValLike, VertexId}
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer
 import org.apache.tinkerpop.gremlin.structure
@@ -69,7 +69,7 @@ trait S2GraphLike extends Graph {
 
   val traversalHelper: TraversalHelper
 
-  val modelManager: ModelManager
+  val modelManager: FetcherManager
 
   lazy val MaxRetryNum: Int = config.getInt("max.retry.number")
   lazy val MaxBackOff: Int = config.getInt("max.back.off")
@@ -93,13 +93,20 @@ trait S2GraphLike extends Graph {
 
   def getStorage(label: Label): Storage
 
-  def getFetcher(column: ServiceColumn): Fetcher
+  def getVertexFetcher(column: ServiceColumn): VertexFetcher
 
-  def getFetcher(label: Label): Fetcher
+  def getVertexBulkFetcher(): VertexBulkFetcher
 
-  def getMutator(label: Label): Mutator
+  def getEdgeFetcher(label: Label): EdgeFetcher
 
-  def getMutator(column: ServiceColumn): Mutator
+  def getEdgeBulkFetcher(): EdgeBulkFetcher
+
+  /** optional */
+  def getOptimisticEdgeFetcher(label: Label): OptimisticEdgeFetcher
+
+  def getEdgeMutator(label: Label): EdgeMutator
+
+  def getVertexMutator(column: ServiceColumn): VertexMutator
 
   def flushStorage(): Unit
 
@@ -204,7 +211,7 @@ trait S2GraphLike extends Graph {
 
     if (ids.isEmpty) {
       //TODO: default storage need to be fixed.
-      Await.result(defaultStorage.fetchVerticesAll(), WaitTimeout).iterator
+      Await.result(getVertexBulkFetcher().fetchVerticesAll(), WaitTimeout).iterator
     } else {
       val vertices = ids.collect {
         case s2Vertex: S2VertexLike => s2Vertex
@@ -229,7 +236,7 @@ trait S2GraphLike extends Graph {
   def edges(edgeIds: AnyRef*): util.Iterator[structure.Edge] = {
     if (edgeIds.isEmpty) {
       // FIXME
-      Await.result(defaultStorage.fetchEdgesAll(), WaitTimeout).iterator
+      Await.result(getEdgeBulkFetcher().fetchEdgesAll(), WaitTimeout).iterator
     } else {
       Await.result(edgesAsync(edgeIds: _*), WaitTimeout)
     }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
index d19dd1f..0a4a49b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
@@ -204,7 +204,7 @@ class TraversalHelper(graph: S2GraphLike) {
     val aggFuture = requestsPerLabel.foldLeft(Future.successful(Map.empty[Int, StepResult])) { case (prevFuture, (label, reqWithIdxs)) =>
       for {
         prev <- prevFuture
-        cur <- graph.getFetcher(label).fetches(reqWithIdxs.map(_._1), prevStepEdges)
+        cur <- graph.getEdgeFetcher(label).fetches(reqWithIdxs.map(_._1), prevStepEdges)
       } yield {
         prev ++ reqWithIdxs.map(_._2).zip(cur).toMap
       }
@@ -256,7 +256,7 @@ class TraversalHelper(graph: S2GraphLike) {
               */
             graph.mutateEdges(edgesToDelete.map(_.edge), withWait = true).map(_.forall(_.isSuccess))
           } else {
-            graph.getMutator(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
+            graph.getEdgeMutator(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
           }
         case _ =>
 
@@ -264,7 +264,7 @@ class TraversalHelper(graph: S2GraphLike) {
             * read: x
             * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices)
             */
-          graph.getMutator(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
+          graph.getEdgeMutator(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
       }
       ret
     }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/VertexBulkFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/VertexBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/VertexBulkFetcher.scala
new file mode 100644
index 0000000..cbebab5
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/VertexBulkFetcher.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait VertexBulkFetcher {
+  def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]]
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala
new file mode 100644
index 0000000..5c10d18
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.types.VertexId
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait VertexFetcher {
+  def init(config: Config)(implicit ec: ExecutionContext): Unit = {}
+  def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]]
+  def close(): Unit = {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala
new file mode 100644
index 0000000..18be890
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.storage.MutateResponse
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait VertexMutator {
+  def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse]
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/fetcher/FetcherManager.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/FetcherManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/FetcherManager.scala
new file mode 100644
index 0000000..26db7ff
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/FetcherManager.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.fetcher
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.schema.Label
+import org.apache.s2graph.core.utils.{Importer, logger}
+import org.apache.s2graph.core.{EdgeFetcher, S2GraphLike}
+
+import scala.concurrent.{ExecutionContext, Future}
+
+object FetcherManager {
+  val ClassNameKey = "className"
+}
+
+class FetcherManager(s2GraphLike: S2GraphLike) {
+
+  import FetcherManager._
+
+  private val fetcherPool = scala.collection.mutable.Map.empty[String, EdgeFetcher]
+
+  private val ImportLock = new java.util.concurrent.ConcurrentHashMap[String, Importer]
+
+  def toImportLockKey(label: Label): String = label.label
+
+  def getFetcher(label: Label): EdgeFetcher = {
+    fetcherPool.getOrElse(toImportLockKey(label), throw new IllegalStateException(s"$label is not imported."))
+  }
+
+  def initImporter(config: Config): Importer = {
+    val className = config.getString(ClassNameKey)
+
+    Class.forName(className)
+      .getConstructor(classOf[S2GraphLike])
+      .newInstance(s2GraphLike)
+      .asInstanceOf[Importer]
+  }
+
+  def initFetcher(config: Config)(implicit ec: ExecutionContext): Future[EdgeFetcher] = {
+    val className = config.getString(ClassNameKey)
+
+    val fetcher = Class.forName(className)
+      .getConstructor(classOf[S2GraphLike])
+      .newInstance(s2GraphLike)
+      .asInstanceOf[EdgeFetcher]
+
+    fetcher.init(config)
+
+    Future.successful(fetcher)
+  }
+
+  def importModel(label: Label, config: Config)(implicit ec: ExecutionContext): Future[Importer] = {
+    val importer = ImportLock.computeIfAbsent(toImportLockKey(label), new java.util.function.Function[String, Importer] {
+      override def apply(k: String): Importer = {
+        val importer = initImporter(config.getConfig("importer"))
+
+        //TODO: Update Label's extra options.
+        importer
+          .run(config.getConfig("importer"))
+          .map { importer =>
+            logger.info(s"Close importer")
+            importer.close()
+
+            initFetcher(config.getConfig("fetcher")).map { fetcher =>
+              importer.setStatus(true)
+
+              fetcherPool
+                .remove(k)
+                .foreach { oldFetcher =>
+                  logger.info(s"Delete old storage ($k) => $oldFetcher")
+                  oldFetcher.close()
+                }
+
+              fetcherPool += (k -> fetcher)
+            }
+          }
+          .onComplete { _ =>
+            logger.info(s"ImportLock release: $k")
+            ImportLock.remove(k)
+          }
+
+        importer
+      }
+    })
+
+    Future.successful(importer)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala
new file mode 100644
index 0000000..bf90d69
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.fetcher
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.types.VertexId
+
+import scala.concurrent.{ExecutionContext, Future}
+
+/**
+  * Reference implementation for Fetcher interface.
+  * it only produce constant edges.
+  */
+class MemoryModelEdgeFetcher(val graph: S2GraphLike) extends EdgeFetcher {
+  val builder = graph.elementBuilder
+  val ranges = (0 until 10)
+
+
+  override def fetches(queryRequests: Seq[QueryRequest],
+                       prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = {
+    val stepResultLs = queryRequests.map { queryRequest =>
+      val queryParam = queryRequest.queryParam
+      val edges = ranges.map { ith =>
+        val tgtVertexId = builder.newVertexId(queryParam.label.service, queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), ith.toString)
+
+        graph.toEdge(queryRequest.vertex.innerIdVal,
+          tgtVertexId.innerId.value, queryParam.label.label, queryParam.direction)
+      }
+
+      val edgeWithScores = edges.map(e => EdgeWithScore(e, 1.0, queryParam.label))
+      StepResult(edgeWithScores, Nil, Nil)
+    }
+
+    Future.successful(stepResultLs)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala
deleted file mode 100644
index 189a6d0..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.model
-
-import java.util.concurrent.atomic.AtomicInteger
-
-trait ImportStatus {
-  val done: AtomicInteger
-
-  def isCompleted: Boolean
-
-  def percentage: Int
-
-  val total: Int
-}
-
-class ImportRunningStatus(val total: Int) extends ImportStatus {
-  require(total > 0, s"Total should be positive: $total")
-
-  val done = new AtomicInteger(0)
-
-  def isCompleted: Boolean = total == done.get
-
-  def percentage = 100 * done.get / total
-}
-
-case object ImportDoneStatus extends ImportStatus {
-  val total = 1
-
-  val done = new AtomicInteger(1)
-
-  def isCompleted: Boolean = true
-
-  def percentage = 100
-}
-
-object ImportStatus {
-  def apply(total: Int): ImportStatus = new ImportRunningStatus(total)
-
-  def unapply(importResult: ImportStatus): Option[(Boolean, Int, Int)] =
-    Some((importResult.isCompleted, importResult.total, importResult.done.get))
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala
deleted file mode 100644
index e3084dd..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.model
-
-import java.io.File
-
-import com.typesafe.config.Config
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.s2graph.core.{Fetcher, S2GraphLike}
-import org.apache.s2graph.core.utils.logger
-
-import scala.concurrent.{ExecutionContext, Future}
-
-object Importer {
-  def toHDFSConfiguration(hdfsConfDir: String): Configuration = {
-    val conf = new Configuration
-
-    val hdfsConfDirectory = new File(hdfsConfDir)
-    if (hdfsConfDirectory.exists()) {
-      if (!hdfsConfDirectory.isDirectory || !hdfsConfDirectory.canRead) {
-        throw new IllegalStateException(s"HDFS configuration directory ($hdfsConfDirectory) cannot be read.")
-      }
-
-      val path = hdfsConfDirectory.getAbsolutePath
-      conf.addResource(new Path(s"file:///$path/core-site.xml"))
-      conf.addResource(new Path(s"file:///$path/hdfs-site.xml"))
-    } else {
-      logger.warn("RocksDBImporter doesn't have valid hadoop configuration directory..")
-    }
-    conf
-  }
-}
-
-trait Importer {
-  @volatile var isFinished: Boolean = false
-
-  def run(config: Config)(implicit ec: ExecutionContext): Future[Importer]
-
-  def status: Boolean = isFinished
-
-  def setStatus(otherStatus: Boolean): Boolean = {
-    this.isFinished = otherStatus
-    this.isFinished
-  }
-
-  def close(): Unit
-}
-
-case class IdentityImporter(graph: S2GraphLike) extends Importer {
-  override def run(config: Config)(implicit ec: ExecutionContext): Future[Importer] = {
-    Future.successful(this)
-  }
-
-  override def close(): Unit = {}
-}
-
-object HDFSImporter {
-
-  import scala.collection.JavaConverters._
-
-  val PathsKey = "paths"
-  val HDFSConfDirKey = "hdfsConfDir"
-
-  def extractPaths(config: Config): Map[String, String] = {
-    config.getConfigList(PathsKey).asScala.map { e =>
-      val key = e.getString("src")
-      val value = e.getString("tgt")
-
-      key -> value
-    }.toMap
-  }
-}
-
-case class HDFSImporter(graph: S2GraphLike) extends Importer {
-
-  import HDFSImporter._
-
-  override def run(config: Config)(implicit ec: ExecutionContext): Future[Importer] = {
-    Future {
-      val paths = extractPaths(config)
-      val hdfsConfiDir = config.getString(HDFSConfDirKey)
-
-      val hadoopConfig = Importer.toHDFSConfiguration(hdfsConfiDir)
-      val fs = FileSystem.get(hadoopConfig)
-
-      def copyToLocal(remoteSrc: String, localSrc: String): Unit = {
-        val remoteSrcPath = new Path(remoteSrc)
-        val localSrcPath = new Path(localSrc)
-
-        fs.copyToLocalFile(remoteSrcPath, localSrcPath)
-      }
-
-      paths.foreach { case (srcPath, tgtPath) =>
-        copyToLocal(srcPath, tgtPath)
-      }
-
-      this
-    }
-  }
-
-  //  override def status: ImportStatus = ???
-
-  override def close(): Unit = {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala
deleted file mode 100644
index 2130066..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.model
-
-import com.typesafe.config.Config
-import org.apache.s2graph.core._
-import org.apache.s2graph.core.types.VertexId
-
-import scala.concurrent.{ExecutionContext, Future}
-
-/**
-  * Reference implementation for Fetcher interface.
-  * it only produce constant edges.
-  */
-class MemoryModelFetcher(val graph: S2GraphLike) extends Fetcher {
-  val builder = graph.elementBuilder
-  val ranges = (0 until 10)
-
-  override def init(config: Config)(implicit ec: ExecutionContext): Future[Fetcher] = {
-    Future.successful(this)
-  }
-
-  override def fetches(queryRequests: Seq[QueryRequest],
-                       prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = {
-    val stepResultLs = queryRequests.map { queryRequest =>
-      val queryParam = queryRequest.queryParam
-      val edges = ranges.map { ith =>
-        val tgtVertexId = builder.newVertexId(queryParam.label.service, queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), ith.toString)
-
-        graph.toEdge(queryRequest.vertex.innerIdVal,
-          tgtVertexId.innerId.value, queryParam.label.label, queryParam.direction)
-      }
-
-      val edgeWithScores = edges.map(e => EdgeWithScore(e, 1.0, queryParam.label))
-      StepResult(edgeWithScores, Nil, Nil)
-    }
-
-    Future.successful(stepResultLs)
-  }
-
-  override def close(): Unit = {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala
deleted file mode 100644
index 3cad13c..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.model
-
-import com.typesafe.config.Config
-import org.apache.s2graph.core.schema.Label
-import org.apache.s2graph.core.utils.logger
-import org.apache.s2graph.core.{Fetcher, S2GraphLike}
-
-import scala.concurrent.{ExecutionContext, Future}
-
-object ModelManager {
-  val ClassNameKey = "className"
-}
-
-class ModelManager(s2GraphLike: S2GraphLike) {
-
-  import ModelManager._
-
-  private val fetcherPool = scala.collection.mutable.Map.empty[String, Fetcher]
-  private val ImportLock = new java.util.concurrent.ConcurrentHashMap[String, Importer]
-
-  def toImportLockKey(label: Label): String = label.label
-
-  def getFetcher(label: Label): Fetcher = {
-    fetcherPool.getOrElse(toImportLockKey(label), throw new IllegalStateException(s"$label is not imported."))
-  }
-
-  def initImporter(config: Config): Importer = {
-    val className = config.getString(ClassNameKey)
-
-    Class.forName(className)
-      .getConstructor(classOf[S2GraphLike])
-      .newInstance(s2GraphLike)
-      .asInstanceOf[Importer]
-  }
-
-  def initFetcher(config: Config)(implicit ec: ExecutionContext): Future[Fetcher] = {
-    val className = config.getString(ClassNameKey)
-
-    val fetcher = Class.forName(className)
-      .getConstructor(classOf[S2GraphLike])
-      .newInstance(s2GraphLike)
-      .asInstanceOf[Fetcher]
-
-    fetcher.init(config)
-  }
-
-  def importModel(label: Label, config: Config)(implicit ec: ExecutionContext): Future[Importer] = {
-    val importer = ImportLock.computeIfAbsent(toImportLockKey(label), new java.util.function.Function[String, Importer] {
-      override def apply(k: String): Importer = {
-        val importer = initImporter(config.getConfig("importer"))
-
-        //TODO: Update Label's extra options.
-        importer
-          .run(config.getConfig("importer"))
-          .map { importer =>
-            logger.info(s"Close importer")
-            importer.close()
-
-            initFetcher(config.getConfig("fetcher")).map { fetcher =>
-              importer.setStatus(true)
-
-              fetcherPool
-                .remove(k)
-                .foreach { oldFetcher =>
-                  logger.info(s"Delete old storage ($k) => $oldFetcher")
-                  oldFetcher.close()
-                }
-
-              fetcherPool += (k -> fetcher)
-            }
-          }
-          .onComplete { _ =>
-            logger.info(s"ImportLock release: $k")
-            ImportLock.remove(k)
-          }
-
-        importer
-      }
-    })
-
-    Future.successful(importer)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala
new file mode 100644
index 0000000..82cc27a
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.s2graph.core.storage
+
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.schema.LabelMeta
+import org.apache.s2graph.core.utils.logger
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class DefaultOptimisticMutator(graph: S2GraphLike,
+                               serDe: StorageSerDe,
+                               optimisticEdgeFetcher: OptimisticEdgeFetcher,
+                               optimisticMutator: OptimisticMutator) extends VertexMutator with EdgeMutator {
+
+  lazy val io: StorageIO = new StorageIO(graph, serDe)
+
+  lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, optimisticMutator, optimisticEdgeFetcher)
+
+  private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
+    optimisticMutator.writeToStorage(cluster, kvs, withWait)
+
+  def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
+                                    requestTs: Long,
+                                    retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = {
+    if (stepInnerResult.isEmpty) Future.successful(true)
+    else {
+      val head = stepInnerResult.edgeWithScores.head
+      val zkQuorum = head.edge.innerLabel.hbaseZkAddr
+      val futures = for {
+        edgeWithScore <- stepInnerResult.edgeWithScores
+      } yield {
+        val edge = edgeWithScore.edge
+
+        val edgeSnapshot = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
+        val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
+
+        val edgeForward = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
+        val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge =>
+          serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
+            io.buildIncrementsAsync(indexEdge, -1L)
+        }
+
+        /* reverted direction */
+        val edgeRevert = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
+        val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
+          serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
+            io.buildIncrementsAsync(indexEdge, -1L)
+        }
+
+        val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
+
+        writeToStorage(zkQuorum, mutations, withWait = true)
+      }
+
+      Future.sequence(futures).map { rets => rets.forall(_.isSuccess) }
+    }
+  }
+
+  def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
+    if (vertex.op == GraphUtil.operations("delete")) {
+      writeToStorage(zkQuorum,
+        serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait)
+    } else if (vertex.op == GraphUtil.operations("deleteAll")) {
+      logger.info(s"deleteAll for vertex is truncated. $vertex")
+      Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time
+    } else {
+      writeToStorage(zkQuorum, io.buildPutsAll(vertex), withWait)
+    }
+  }
+
+  def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = {
+    val mutations = _edges.flatMap { edge =>
+      val (_, edgeUpdate) =
+        if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
+        else S2Edge.buildOperation(None, Seq(edge))
+
+      val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
+
+      if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
+      io.buildVertexPutsAsync(edge) ++ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
+    }
+
+    writeToStorage(zkQuorum, mutations, withWait).map { ret =>
+      _edges.zipWithIndex.map { case (edge, idx) =>
+        idx -> ret.isSuccess
+      }
+    }
+  }
+
+  def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
+    def mutateEdgesInner(edges: Seq[S2EdgeLike],
+                         checkConsistency: Boolean,
+                         withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
+      assert(edges.nonEmpty)
+      // TODO:: remove after code review: unreachable code
+      if (!checkConsistency) {
+
+        val futures = edges.map { edge =>
+          val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))
+
+          val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
+          val mutations =
+            io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
+
+          if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
+
+          writeToStorage(zkQuorum, mutations, withWait)
+        }
+        Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) }
+      } else {
+        optimisticEdgeFetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
+          conflictResolver.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_))
+        }
+      }
+    }
+
+    val edgeWithIdxs = _edges.zipWithIndex
+    val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
+      (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId)
+    } toSeq
+
+    val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
+      val edges = edgeGroup.map(_._1)
+      val idxs = edgeGroup.map(_._2)
+      // After deleteAll, process others
+      val mutateEdgeFutures = edges.toList match {
+        case head :: tail =>
+          val edgeFuture = mutateEdgesInner(edges, checkConsistency = true, withWait)
+
+          //TODO: decide what we will do on failure on vertex put
+          val puts = io.buildVertexPutsAsync(head)
+          val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait)
+          Seq(edgeFuture, vertexFuture)
+        case Nil => Nil
+      }
+
+      val composed = for {
+        //        deleteRet <- Future.sequence(deleteAllFutures)
+        mutateRet <- Future.sequence(mutateEdgeFutures)
+      } yield mutateRet
+
+      composed.map(_.forall(_.isSuccess)).map { ret => idxs.map(idx => idx -> ret) }
+    }
+
+    Future.sequence(mutateEdges).map { squashedRets =>
+      squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2)
+    }
+  }
+
+  def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = {
+    val futures = for {
+      edge <- edges
+    } yield {
+      val kvs = for {
+        relEdge <- edge.relatedEdges
+        edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid)
+      } yield {
+        val countWithTs = edge.propertyValueInner(LabelMeta.count)
+        val countVal = countWithTs.innerVal.toString().toLong
+        io.buildIncrementsCountAsync(edgeWithIndex, countVal).head
+      }
+      writeToStorage(zkQuorum, kvs, withWait = withWait)
+    }
+
+    Future.sequence(futures)
+  }
+
+  def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = {
+    val kvs = io.buildDegreePuts(edge, degreeVal)
+
+    writeToStorage(zkQuorum, kvs, withWait = true)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticEdgeFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticEdgeFetcher.scala
new file mode 100644
index 0000000..4111cc4
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticEdgeFetcher.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage
+
+import org.apache.s2graph.core.GraphExceptions.FetchTimeoutException
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.utils.logger
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait OptimisticEdgeFetcher {
+  val io: StorageIO
+  protected def fetchKeyValues(queryRequest: QueryRequest,
+                               edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
+
+  def fetchSnapshotEdgeInner(edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[(Option[S2EdgeLike], Option[SKeyValue])] = {
+    val queryParam = QueryParam(labelName = edge.innerLabel.label,
+      direction = GraphUtil.fromDirection(edge.getDir()),
+      tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal),
+      cacheTTLInMillis = -1)
+    val q = Query.toQuery(Seq(edge.srcVertex), Seq(queryParam))
+    val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam)
+
+    fetchKeyValues(queryRequest, edge).map { kvs =>
+      val (edgeOpt, kvOpt) =
+        if (kvs.isEmpty) (None, None)
+        else {
+          import CanSKeyValue._
+          val snapshotEdgeOpt = io.toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil)
+          val _kvOpt = kvs.headOption
+          (snapshotEdgeOpt, _kvOpt)
+        }
+      (edgeOpt, kvOpt)
+    } recoverWith { case ex: Throwable =>
+      logger.error(s"fetchQueryParam failed. fallback return.", ex)
+      throw new FetchTimeoutException(s"${edge.toLogString}")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticMutator.scala
new file mode 100644
index 0000000..22269df
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticMutator.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait OptimisticMutator {
+  /**
+    * decide how to store given key values Seq[SKeyValue] into storage using storage's client.
+    * note that this should be return true on all success.
+    * we assumes that each storage implementation has client as member variable.
+    *
+    * @param cluster  : where this key values should be stored.
+    * @param kvs      : sequence of SKeyValue that need to be stored in storage.
+    * @param withWait : flag to control wait ack from storage.
+    *                 note that in AsynchbaseStorage(which support asynchronous operations), even with true,
+    *                 it never block thread, but rather submit work and notified by event loop when storage send ack back.
+    * @return ack message from storage.
+    */
+  def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse]
+
+  /**
+    * write requestKeyValue into storage if the current value in storage that is stored matches.
+    * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge.
+    *
+    * Most important thing is this have to be 'atomic' operation.
+    * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be
+    * either blocked or failed on write-write conflict case.
+    *
+    * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to
+    * prevent wrong data for read.
+    *
+    * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction,
+    * compareAndSet to synchronize.
+    *
+    * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'.
+    * for storage that does not support concurrency control, then storage implementation
+    * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues)
+    * and write(writeLock).
+    *
+    * @param requestKeyValue
+    * @param expectedOpt
+    * @return
+    */
+  def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index d2500a6..36ecfcb 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -34,52 +34,30 @@ abstract class Storage(val graph: S2GraphLike,
   val management: StorageManagement
 
   /*
-   * Given QueryRequest/Vertex/Edge, fetch KeyValue from storage
-   * then convert them into Edge/Vertex
-   */
-  val reader: StorageReadable
-
-  /*
-   * Serialize Edge/Vertex, to common KeyValue, SKeyValue that
-   * can be stored aligned to backend storage's physical schema.
-   * Also Deserialize storage backend's KeyValue to SKeyValue.
-   */
+     * Serialize Edge/Vertex, to common KeyValue, SKeyValue that
+     * can be stored aligned to backend storage's physical schema.
+     * Also Deserialize storage backend's KeyValue to SKeyValue.
+     */
   val serDe: StorageSerDe
 
-  /*
-   * Responsible to connect physical storage backend to store GraphElement(Edge/Vertex).
-   */
-  val mutator: Mutator
+  val edgeFetcher: EdgeFetcher
 
-  /*
-   * Common helper to translate SKeyValue to Edge/Vertex and vice versa.
-   * Note that it require storage backend specific implementation for serialize/deserialize.
-   */
-  lazy val io: StorageIO = new StorageIO(graph, serDe)
+  val edgeBulkFetcher: EdgeBulkFetcher
 
-  /*
-   * Common helper to resolve write-write conflict on snapshot edge with same EdgeId.
-   * Note that it require storage backend specific implementations for
-   * all of StorageWritable, StorageReadable, StorageSerDe, StorageIO
-   */
-//  lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, mutator, reader)
-//  lazy val mutationHelper: MutationHelper = new MutationHelper(this)
+  val vertexFetcher: VertexFetcher
 
+  val vertexBulkFetcher: VertexBulkFetcher
 
-  /** Fetch **/
-  def fetches(queryRequests: Seq[QueryRequest],
-              prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] =
-    reader.fetches(queryRequests, prevStepEdges)
+  val edgeMutator: EdgeMutator
 
-  def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] =
-    reader.fetchVertices(vertices)
+  val vertexMutator: VertexMutator
 
-  def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = reader.fetchEdgesAll()
-
-  def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = reader.fetchVerticesAll()
+  /*
+   * Common helper to translate SKeyValue to Edge/Vertex and vice versa.
+   * Note that it require storage backend specific implementation for serialize/deserialize.
+   */
+  lazy val io: StorageIO = new StorageIO(graph, serDe)
 
-  def fetchSnapshotEdgeInner(edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[(Option[S2EdgeLike], Option[SKeyValue])] =
-    reader.fetchSnapshotEdgeInner(edge)
 
   /** Management **/
   def flush(): Unit = management.flush()
@@ -94,24 +72,4 @@ abstract class Storage(val graph: S2GraphLike,
 
   def info: Map[String, String] = Map("className" -> this.getClass.getSimpleName)
 
-  def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
-                                    requestTs: Long,
-                                    retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] =
-    mutator.deleteAllFetchedEdgesAsyncOld(stepInnerResult, requestTs, retryNum)
-
-  def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
-    mutator.mutateVertex(zkQuorum: String, vertex, withWait)
-
-  def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] =
-    mutator.mutateStrongEdges(zkQuorum, _edges, withWait)
-
-
-  def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] =
-    mutator.mutateWeakEdges(zkQuorum, _edges, withWait)
-
-  def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] =
-    mutator.incrementCounts(zkQuorum, edges, withWait)
-
-  def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] =
-    mutator.updateDegree(zkQuorum, edge, degreeVal)
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
index b10feb9..c3abd03 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
@@ -27,7 +27,7 @@ import org.apache.s2graph.core.utils.logger
 
 import scala.concurrent.{ExecutionContext, Future}
 
-trait StorageReadable extends Fetcher {
+trait StorageReadable extends EdgeFetcher {
   val io: StorageIO
   val serDe: StorageSerDe
 // /**
@@ -44,9 +44,14 @@ trait StorageReadable extends Fetcher {
 
   def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]]
 
+
+
+
+
+
   protected def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
 
-  protected def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
+//  protected def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
 
 
   def fetchSnapshotEdgeInner(edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[(Option[S2EdgeLike], Option[SKeyValue])] = {
@@ -73,25 +78,25 @@ trait StorageReadable extends Fetcher {
     }
   }
 
-  def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = {
-    def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2VertexLike] = {
-      if (kvs.isEmpty) Nil
-      else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq
-    }
-
-    val futures = vertices.map { vertex =>
-      val queryParam = QueryParam.Empty
-      val q = Query.toQuery(Seq(vertex), Seq(queryParam))
-      val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
-
-      fetchKeyValues(queryRequest, vertex).map { kvs =>
-        fromResult(kvs, vertex.serviceColumn.schemaVersion)
-      } recoverWith {
-        case ex: Throwable => Future.successful(Nil)
-      }
-    }
-
-    Future.sequence(futures).map(_.flatten)
-  }
+//  def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = {
+//    def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2VertexLike] = {
+//      if (kvs.isEmpty) Nil
+//      else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq
+//    }
+//
+//    val futures = vertices.map { vertex =>
+//      val queryParam = QueryParam.Empty
+//      val q = Query.toQuery(Seq(vertex), Seq(queryParam))
+//      val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
+//
+//      fetchKeyValues(queryRequest, vertex).map { kvs =>
+//        fromResult(kvs, vertex.serviceColumn.schemaVersion)
+//      } recoverWith {
+//        case ex: Throwable => Future.successful(Nil)
+//      }
+//    }
+//
+//    Future.sequence(futures).map(_.flatten)
+//  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala
deleted file mode 100644
index 8c2fb27..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage
-
-import org.apache.s2graph.core.Mutator
-
-import scala.concurrent.{ExecutionContext, Future}
-
-trait OptimisticMutator extends Mutator {
-  /**
-    * decide how to store given key values Seq[SKeyValue] into storage using storage's client.
-    * note that this should be return true on all success.
-    * we assumes that each storage implementation has client as member variable.
-    *
-    * @param cluster  : where this key values should be stored.
-    * @param kvs      : sequence of SKeyValue that need to be stored in storage.
-    * @param withWait : flag to control wait ack from storage.
-    *                 note that in AsynchbaseStorage(which support asynchronous operations), even with true,
-    *                 it never block thread, but rather submit work and notified by event loop when storage send ack back.
-    * @return ack message from storage.
-    */
-  def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse]
-
-  /**
-    * write requestKeyValue into storage if the current value in storage that is stored matches.
-    * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge.
-    *
-    * Most important thing is this have to be 'atomic' operation.
-    * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be
-    * either blocked or failed on write-write conflict case.
-    *
-    * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to
-    * prevent wrong data for read.
-    *
-    * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction,
-    * compareAndSet to synchronize.
-    *
-    * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'.
-    * for storage that does not support concurrency control, then storage implementation
-    * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues)
-    * and write(writeLock).
-    *
-    * @param requestKeyValue
-    * @param expectedOpt
-    * @return
-    */
-  def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
index bfc5bc6..18159f6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
@@ -32,7 +32,7 @@ class WriteWriteConflictResolver(graph: S2GraphLike,
                                  serDe: StorageSerDe,
                                  io: StorageIO,
                                  mutator: OptimisticMutator,
-                                 fetcher: StorageReadable) {
+                                 optimisticEdgeFetcher: OptimisticEdgeFetcher) {
   val BackoffTimeout = graph.BackoffTimeout
   val MaxRetryNum = graph.MaxRetryNum
   val MaxBackOff = graph.MaxBackOff
@@ -68,7 +68,7 @@ class WriteWriteConflictResolver(graph: S2GraphLike,
         case FetchTimeoutException(retryEdge) =>
           logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}")
           /* fetch failed. re-fetch should be done */
-          fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
+          optimisticEdgeFetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
             retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
           }
 
@@ -90,7 +90,7 @@ class WriteWriteConflictResolver(graph: S2GraphLike,
               val future = if (failedStatusCode == 0) {
                 // acquire Lock failed. other is mutating so this thead need to re-fetch snapshotEdge.
                 /* fetch failed. re-fetch should be done */
-                fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
+                optimisticEdgeFetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
                   retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
                 }
               } else {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeBulkFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeBulkFetcher.scala
new file mode 100644
index 0000000..3d25dd9
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeBulkFetcher.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.hbase
+
+import java.util
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.schema.Label
+import org.apache.s2graph.core.storage.serde.Serializable
+import org.apache.s2graph.core.{EdgeBulkFetcher, S2EdgeLike, S2Graph, S2GraphLike}
+import org.apache.s2graph.core.storage.{CanSKeyValue, StorageIO, StorageSerDe}
+import org.apache.s2graph.core.types.HBaseType
+import org.apache.s2graph.core.utils.{CanDefer, Extensions}
+import org.hbase.async.{HBaseClient, KeyValue}
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class AsynchbaseEdgeBulkFetcher(val graph: S2GraphLike,
+                                val config: Config,
+                                val client: HBaseClient,
+                                val serDe: StorageSerDe,
+                                val io: StorageIO) extends EdgeBulkFetcher {
+  import Extensions.DeferOps
+  import CanDefer._
+  import scala.collection.JavaConverters._
+  import AsynchbaseStorage._
+
+  override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = {
+    val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case (hTableName, labels) =>
+      val distinctLabels = labels.toSet
+      val scan = AsynchbasePatcher.newScanner(client, hTableName)
+      scan.setFamily(Serializable.edgeCf)
+      scan.setMaxVersions(1)
+
+      scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map {
+        case null => Seq.empty
+        case kvsLs =>
+          kvsLs.asScala.flatMap { kvs =>
+            kvs.asScala.flatMap { kv =>
+              val sKV = implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv)
+
+              serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION)
+                .fromKeyValues(Seq(kv), None)
+                .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree)
+            }
+          }
+      }
+    }
+
+    Future.sequence(futures).map(_.flatten)
+  }
+}