You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/05/11 03:05:46 UTC

[06/11] incubator-s2graph git commit: Start implement ResourceManager.

Start implement ResourceManager.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/be83d07c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/be83d07c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/be83d07c

Branch: refs/heads/master
Commit: be83d07ca5ecb271bd3678c38734d1176182c286
Parents: 43f627e
Author: DO YUNG YOON <st...@apache.org>
Authored: Wed May 9 19:25:15 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Wed May 9 19:25:15 2018 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/core/schema/schema.sql   |   1 +
 .../apache/s2graph/core/EdgeBulkFetcher.scala   |  28 ---
 .../org/apache/s2graph/core/EdgeFetcher.scala   |   3 +
 .../org/apache/s2graph/core/EdgeMutator.scala   |   7 +
 .../org/apache/s2graph/core/Management.scala    |  57 +++++-
 .../apache/s2graph/core/ResourceManager.scala   | 130 +++++++++++++
 .../scala/org/apache/s2graph/core/S2Graph.scala |  49 ++---
 .../apache/s2graph/core/S2GraphFactory.scala    |   2 +-
 .../org/apache/s2graph/core/S2GraphLike.scala   |  38 ++--
 .../apache/s2graph/core/VertexBulkFetcher.scala |  26 ---
 .../org/apache/s2graph/core/VertexFetcher.scala |   4 +
 .../org/apache/s2graph/core/VertexMutator.scala |   5 +
 .../s2graph/core/fetcher/FetcherManager.scala   | 106 -----------
 .../core/fetcher/MemoryModelEdgeFetcher.scala   |  26 ++-
 .../apache/s2graph/core/io/Conversions.scala    |   6 +-
 .../org/apache/s2graph/core/schema/Label.scala  |  34 ++--
 .../s2graph/core/schema/ServiceColumn.scala     |  47 ++++-
 .../storage/DefaultOptimisticEdgeMutator.scala  | 176 +++++++++++++++++
 .../core/storage/DefaultOptimisticMutator.scala | 190 -------------------
 .../DefaultOptimisticVertexMutator.scala        |  44 +++++
 .../apache/s2graph/core/storage/Storage.scala   |   4 -
 .../hbase/AsynchbaseEdgeBulkFetcher.scala       |  69 -------
 .../storage/hbase/AsynchbaseEdgeFetcher.scala   |  31 ++-
 .../core/storage/hbase/AsynchbaseStorage.scala  |   7 +-
 .../hbase/AsynchbaseVertexBulkFetcher.scala     |  63 ------
 .../storage/hbase/AsynchbaseVertexFetcher.scala |  26 +++
 .../storage/rocks/RocksEdgeBulkFetcher.scala    |  68 -------
 .../core/storage/rocks/RocksEdgeFetcher.scala   |  35 +++-
 .../core/storage/rocks/RocksStorage.scala       |   7 +-
 .../storage/rocks/RocksVertexBulkFetcher.scala  |  88 ---------
 .../core/storage/rocks/RocksVertexFetcher.scala |  53 ++++++
 .../s2graph/core/utils/SafeUpdateCache.scala    |   6 +-
 .../s2graph/core/fetcher/EdgeFetcherTest.scala  |  12 +-
 .../apache/s2graph/graphql/GraphQLServer.scala  |   8 +-
 .../org/apache/s2graph/graphql/HttpServer.scala |   4 +-
 35 files changed, 710 insertions(+), 750 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/resources/org/apache/s2graph/core/schema/schema.sql
----------------------------------------------------------------------
diff --git a/s2core/src/main/resources/org/apache/s2graph/core/schema/schema.sql b/s2core/src/main/resources/org/apache/s2graph/core/schema/schema.sql
index 6b9b71e..4f7f832 100644
--- a/s2core/src/main/resources/org/apache/s2graph/core/schema/schema.sql
+++ b/s2core/src/main/resources/org/apache/s2graph/core/schema/schema.sql
@@ -48,6 +48,7 @@ CREATE TABLE `service_columns` (
   `column_name` varchar(64) NOT NULL,
   `column_type` varchar(8) NOT NULL,
   `schema_version` varchar(8) NOT NULL default 'v2',
+  `options` text,
   PRIMARY KEY (`id`),
   UNIQUE KEY `ux_service_id_column_name` (`service_id`, `column_name`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/EdgeBulkFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/EdgeBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/EdgeBulkFetcher.scala
deleted file mode 100644
index 646f5f4..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/EdgeBulkFetcher.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core
-
-import com.typesafe.config.Config
-
-import scala.concurrent.{ExecutionContext, Future}
-
-trait EdgeBulkFetcher {
-  def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]]
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala
index f28a161..c3760e0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala
@@ -31,5 +31,8 @@ trait EdgeFetcher {
   def fetches(queryRequests: Seq[QueryRequest],
               prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]]
 
+  def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]]
+
   def close(): Unit = {}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala
index dc0099e..252d129 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala
@@ -19,11 +19,13 @@
 
 package org.apache.s2graph.core
 
+import com.typesafe.config.Config
 import org.apache.s2graph.core.storage.MutateResponse
 
 import scala.concurrent.{ExecutionContext, Future}
 
 trait EdgeMutator {
+
   def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]]
 
   def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]]
@@ -35,4 +37,9 @@ trait EdgeMutator {
   def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
                                     requestTs: Long,
                                     retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean]
+
+  def close(): Unit = {}
+
+  def init(config: Config)(implicit ec: ExecutionContext): Unit = {}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index 9046449..c3aef7a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -95,14 +95,15 @@ object Management {
                           columnName: String,
                           columnType: String,
                           props: Seq[Prop],
-                          schemaVersion: String = DEFAULT_VERSION) = {
+                          schemaVersion: String = DEFAULT_VERSION,
+                          options: Option[String] = None) = {
 
     Schema withTx { implicit session =>
       val serviceOpt = Service.findByName(serviceName, useCache = false)
       serviceOpt match {
         case None => throw new RuntimeException(s"create service $serviceName has not been created.")
         case Some(service) =>
-          val serviceColumn = ServiceColumn.findOrInsert(service.id.get, columnName, Some(columnType), schemaVersion, useCache = false)
+          val serviceColumn = ServiceColumn.findOrInsert(service.id.get, columnName, Some(columnType), schemaVersion, options, useCache = false)
           for {
             Prop(propName, defaultValue, dataType, storeInGlobalIndex) <- props
           } yield {
@@ -304,13 +305,50 @@ class Management(graph: S2GraphLike) {
 
   import Management._
 
-  def importModel(labelName: String, options: String): Future[Importer] = {
-    Label.updateOption(labelName, options)
+  def updateEdgeFetcher(labelName: String, options: String): Unit = {
+    val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
 
-    val label = Label.findByName(labelName, false).getOrElse(throw new LabelNotExistException(labelName))
-    val config = ConfigFactory.parseString(options)
+    updateEdgeFetcher(label, options)
+  }
+
+  def updateEdgeFetcher(label: Label, options: String): Unit = {
+    val newLabel = Label.updateOption(label, options)
+    graph.resourceManager.getOrElseUpdateEdgeFetcher(newLabel, forceUpdate = true)
+  }
+
+  def updateVertexFetcher(serviceName: String, columnName: String, options: String): Unit = {
+    val service = Service.findByName(serviceName).getOrElse(throw new IllegalArgumentException(s"$serviceName is not exist."))
+    val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new IllegalArgumentException(s"$columnName is not exist."))
+
+    updateVertexFetcher(column, options)
+  }
+
+  def updateVertexFetcher(column: ServiceColumn, options: String): Unit = {
+    val newColumn = ServiceColumn.updateOption(column, options)
+    graph.resourceManager.getOrElseUpdateVertexFetcher(newColumn, forceUpdate = true)
+  }
+
+  def updateEdgeMutator(labelName: String, options: String): Unit = {
+    val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
+
+    updateEdgeMutator(label, options)
+  }
+
+  def updateEdgeMutator(label: Label, options: String): Unit = {
+    val newLabel = Label.updateOption(label, options)
+    graph.resourceManager.getOrElseUpdateEdgeMutator(newLabel, forceUpdate = true)
+  }
+
+  def updateVertexMutator(serviceName: String, columnName: String, options: String): Unit = {
+    val service = Service.findByName(serviceName).getOrElse(throw new IllegalArgumentException(s"$serviceName is not exist."))
+    val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new IllegalArgumentException(s"$columnName is not exist."))
+
+    updateVertexMutator(column, options)
+  }
 
-    graph.modelManager.importModel(label, config)(importEx)
+  def updateVertexMutator(column: ServiceColumn, options: String): Unit = {
+    val newColumn = ServiceColumn.updateOption(column, options)
+    graph.resourceManager.getOrElseUpdateVertexMutator(newColumn, forceUpdate = true)
   }
 
   def createStorageTable(zkAddr: String,
@@ -375,14 +413,15 @@ class Management(graph: S2GraphLike) {
                           columnName: String,
                           columnType: String,
                           props: Seq[Prop],
-                          schemaVersion: String = DEFAULT_VERSION): ServiceColumn = {
+                          schemaVersion: String = DEFAULT_VERSION,
+                          options: Option[String] = None): ServiceColumn = {
 
     val serviceColumnTry = Schema withTx { implicit session =>
       val serviceOpt = Service.findByName(serviceName, useCache = false)
       serviceOpt match {
         case None => throw new RuntimeException(s"create service $serviceName has not been created.")
         case Some(service) =>
-          val serviceColumn = ServiceColumn.findOrInsert(service.id.get, columnName, Some(columnType), schemaVersion, useCache = false)
+          val serviceColumn = ServiceColumn.findOrInsert(service.id.get, columnName, Some(columnType), schemaVersion, options, useCache = false)
           for {
             Prop(propName, defaultValue, dataType, storeInGlobalIndex) <- props
           } yield {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
new file mode 100644
index 0000000..b877603
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core
+
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.s2graph.core.schema.{Label, ServiceColumn}
+import org.apache.s2graph.core.utils.SafeUpdateCache
+import scala.concurrent.ExecutionContext
+
+
+object ResourceManager {
+
+  import SafeUpdateCache._
+
+  import scala.collection.JavaConverters._
+
+  val ClassNameKey = "className"
+  val EdgeFetcherKey = classOf[EdgeFetcher].getClass().getName
+
+  val VertexFetcherKey = classOf[VertexFetcher].getClass().getName
+
+  val EdgeMutatorKey = classOf[EdgeMutator].getClass.getName
+  val VertexMutatorKey = classOf[VertexMutator].getClass.getName
+
+  val DefaultConfig = ConfigFactory.parseMap(Map(MaxSizeKey -> 1000, TtlKey -> -1).asJava)
+}
+
+class ResourceManager(graph: S2GraphLike,
+                      _config: Config)(implicit ec: ExecutionContext) {
+
+  import ResourceManager._
+
+  import scala.collection.JavaConverters._
+
+  val cache = new SafeUpdateCache(_config)
+
+  def getAllVertexFetchers(): Seq[VertexFetcher] = {
+    cache.asMap().asScala.toSeq.collect { case (_, (obj: VertexFetcher, _, _)) => obj }
+  }
+
+  def getAllEdgeFetchers(): Seq[EdgeFetcher] = {
+    cache.asMap().asScala.toSeq.collect { case (_, (obj: EdgeFetcher, _, _)) => obj }
+  }
+
+  def getOrElseUpdateVertexFetcher(column: ServiceColumn, forceUpdate: Boolean = false): Option[VertexFetcher] = {
+    val cacheKey = VertexFetcherKey + "_" + column.service.serviceName + "_" + column.columnName
+    cache.withCache(cacheKey, false, forceUpdate) {
+      column.toFetcherConfig.map { fetcherConfig =>
+        val className = fetcherConfig.getString(ClassNameKey)
+        val fetcher = Class.forName(className)
+          .getConstructor(classOf[S2GraphLike])
+          .newInstance(graph)
+          .asInstanceOf[VertexFetcher]
+
+        fetcher.init(fetcherConfig)
+
+        fetcher
+      }
+    }
+  }
+
+  def getOrElseUpdateEdgeFetcher(label: Label, forceUpdate: Boolean = false): Option[EdgeFetcher] = {
+    val cacheKey = EdgeFetcherKey + "_" + label.label
+
+    cache.withCache(cacheKey, false, forceUpdate) {
+      label.toFetcherConfig.map { fetcherConfig =>
+        val className = fetcherConfig.getString(ClassNameKey)
+        val fetcher = Class.forName(className)
+          .getConstructor(classOf[S2GraphLike])
+          .newInstance(graph)
+          .asInstanceOf[EdgeFetcher]
+
+        fetcher.init(fetcherConfig)
+
+        fetcher
+      }
+    }
+  }
+
+  def getOrElseUpdateVertexMutator(column: ServiceColumn, forceUpdate: Boolean = false): Option[VertexMutator] = {
+    val cacheKey = VertexMutatorKey + "_" + column.service.serviceName + "_" + column.columnName
+    cache.withCache(cacheKey, false, forceUpdate) {
+      column.toMutatorConfig.map { mutatorConfig =>
+        val className = mutatorConfig.getString(ClassNameKey)
+        val fetcher = Class.forName(className)
+          .getConstructor(classOf[S2GraphLike])
+          .newInstance(graph)
+          .asInstanceOf[VertexMutator]
+
+        fetcher.init(mutatorConfig)
+
+        fetcher
+      }
+    }
+  }
+
+  def getOrElseUpdateEdgeMutator(label: Label, forceUpdate: Boolean = false): Option[EdgeMutator] = {
+    val cacheKey = EdgeMutatorKey + "_" + label.label
+    cache.withCache(cacheKey, false, forceUpdate) {
+      label.toMutatorConfig.map { mutatorConfig =>
+        val className = mutatorConfig.getString(ClassNameKey)
+        val fetcher = Class.forName(className)
+          .getConstructor(classOf[S2GraphLike])
+          .newInstance(graph)
+          .asInstanceOf[EdgeMutator]
+
+        fetcher.init(mutatorConfig)
+
+        fetcher
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index c4cb48f..09fd55e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -20,30 +20,27 @@
 package org.apache.s2graph.core
 
 import java.util
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
-import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.{ExecutorService, Executors}
 
 import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.commons.configuration.{BaseConfiguration, Configuration}
 import org.apache.s2graph.core.index.IndexProvider
 import org.apache.s2graph.core.io.tinkerpop.optimize.S2GraphStepStrategy
-import org.apache.s2graph.core.fetcher.FetcherManager
 import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
 import org.apache.s2graph.core.storage.rocks.RocksStorage
 import org.apache.s2graph.core.storage.{MutateResponse, OptimisticEdgeFetcher, Storage}
 import org.apache.s2graph.core.types._
-import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger}
-import org.apache.tinkerpop.gremlin.process.traversal.{P, TraversalStrategies}
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
+import org.apache.s2graph.core.utils.{Extensions, Importer, logger}
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies
 import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, Graph}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
-import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.mutable.ArrayBuffer
 import scala.concurrent._
-import scala.concurrent.duration.Duration
-import scala.util.{Random, Try}
+import scala.util.Try
 
 
 object S2Graph {
@@ -94,6 +91,7 @@ object S2Graph {
   val numOfThread = Runtime.getRuntime.availableProcessors()
   val threadPool = Executors.newFixedThreadPool(numOfThread)
   val ec = ExecutionContext.fromExecutor(threadPool)
+  val resourceManagerEc = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numOfThread))
 
   val DefaultServiceName = ""
   val DefaultColumnName = "vertex"
@@ -187,7 +185,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
 
   override val management = new Management(this)
 
-  override val modelManager = new FetcherManager(this)
+  override val resourceManager: ResourceManager = new ResourceManager(this, config)(S2Graph.resourceManagerEc)
 
   override val indexProvider = IndexProvider.apply(config)
 
@@ -250,32 +248,39 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
     storagePool.getOrElse(s"label:${label.label}", defaultStorage)
   }
 
-  //TODO:
+  /* Currently, each getter on Fetcher and Mutator missing proper implementation
+  *  Please discuss what is proper way to maintain resources here and provide
+  *  right implementation(S2GRAPH-213).
+  * */
   override def getVertexFetcher(column: ServiceColumn): VertexFetcher = {
-    getStorage(column.service).vertexFetcher
-  }
-  override def getVertexBulkFetcher: VertexBulkFetcher = {
-    defaultStorage.vertexBulkFetcher
+    resourceManager.getOrElseUpdateVertexFetcher(column)
+      .getOrElse(defaultStorage.vertexFetcher)
   }
 
   override def getEdgeFetcher(label: Label): EdgeFetcher = {
-    if (label.fetchConfigExist) modelManager.getFetcher(label)
-    else getStorage(label).edgeFetcher
+    resourceManager.getOrElseUpdateEdgeFetcher(label)
+      .getOrElse(defaultStorage.edgeFetcher)
   }
 
-  override def getEdgeBulkFetcher: EdgeBulkFetcher = {
-    defaultStorage.edgeBulkFetcher
+  override def getAllVertexFetchers(): Seq[VertexFetcher] = {
+    resourceManager.getAllVertexFetchers()
+  }
+
+  override def getAllEdgeFetchers(): Seq[EdgeFetcher] = {
+    resourceManager.getAllEdgeFetchers()
   }
 
   override def getVertexMutator(column: ServiceColumn): VertexMutator = {
-    getStorage(column.service).vertexMutator
+    resourceManager.getOrElseUpdateVertexMutator(column)
+      .getOrElse(defaultStorage.vertexMutator)
   }
 
   override def getEdgeMutator(label: Label): EdgeMutator = {
-    getStorage(label).edgeMutator
+    resourceManager.getOrElseUpdateEdgeMutator(label)
+      .getOrElse(defaultStorage.edgeMutator)
   }
 
-  /** optional */
+  //TODO:
   override def getOptimisticEdgeFetcher(label: Label): OptimisticEdgeFetcher = {
 //    getStorage(label).optimisticEdgeFetcher
     null

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
index cce05af..0a667ef 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
@@ -66,7 +66,7 @@ object S2GraphFactory {
     val DefaultService = management.createService(DefaultServiceName, "localhost", "s2graph", 0, None).get
 
     //    Management.deleteColumn(DefaultServiceName, DefaultColumnName)
-    val DefaultColumn = ServiceColumn.findOrInsert(DefaultService.id.get, DefaultColumnName, Some("integer"), HBaseType.DEFAULT_VERSION, useCache = false)
+    val DefaultColumn = ServiceColumn.findOrInsert(DefaultService.id.get, DefaultColumnName, Some("integer"), HBaseType.DEFAULT_VERSION, options = None, useCache = false)
 
     val DefaultColumnMetas = {
       ColumnMeta.findOrInsert(DefaultColumn.id.get, "test", "string", "-", storeInGlobalIndex = true, useCache = false)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
index 5e2c168..99423d6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
@@ -19,22 +19,20 @@
 
 package org.apache.s2graph.core
 
-import java.util
-import java.util.concurrent.{CompletableFuture, TimeUnit}
-import java.util.concurrent.atomic.AtomicLong
 import java.lang.{Boolean => JBoolean, Long => JLong}
+import java.util
 import java.util.Optional
+import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.{CompletableFuture, TimeUnit}
 
 import com.typesafe.config.Config
 import org.apache.commons.configuration.Configuration
 import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
-import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName}
 import org.apache.s2graph.core.features.{S2Features, S2GraphVariables}
 import org.apache.s2graph.core.index.IndexProvider
-import org.apache.s2graph.core.fetcher.FetcherManager
 import org.apache.s2graph.core.schema.{Label, LabelMeta, Service, ServiceColumn}
 import org.apache.s2graph.core.storage.{MutateResponse, OptimisticEdgeFetcher, Storage}
-import org.apache.s2graph.core.types.{InnerValLike, VertexId}
+import org.apache.s2graph.core.types.VertexId
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer
 import org.apache.tinkerpop.gremlin.structure
 import org.apache.tinkerpop.gremlin.structure.Edge.Exceptions
@@ -44,10 +42,10 @@ import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, Element, Graph,
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.compat.java8.FutureConverters._
 import scala.compat.java8.OptionConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future}
 
 
 trait S2GraphLike extends Graph {
@@ -69,7 +67,7 @@ trait S2GraphLike extends Graph {
 
   val traversalHelper: TraversalHelper
 
-  val modelManager: FetcherManager
+  val resourceManager: ResourceManager
 
   lazy val MaxRetryNum: Int = config.getInt("max.retry.number")
   lazy val MaxBackOff: Int = config.getInt("max.back.off")
@@ -95,11 +93,11 @@ trait S2GraphLike extends Graph {
 
   def getVertexFetcher(column: ServiceColumn): VertexFetcher
 
-  def getVertexBulkFetcher(): VertexBulkFetcher
-
   def getEdgeFetcher(label: Label): EdgeFetcher
 
-  def getEdgeBulkFetcher(): EdgeBulkFetcher
+  def getAllVertexFetchers(): Seq[VertexFetcher]
+
+  def getAllEdgeFetchers(): Seq[EdgeFetcher]
 
   /** optional */
   def getOptimisticEdgeFetcher(label: Label): OptimisticEdgeFetcher
@@ -211,7 +209,13 @@ trait S2GraphLike extends Graph {
 
     if (ids.isEmpty) {
       //TODO: default storage need to be fixed.
-      Await.result(getVertexBulkFetcher().fetchVerticesAll(), WaitTimeout).iterator
+      val futures = getAllVertexFetchers.map { vertexFetcher =>
+        vertexFetcher.fetchVerticesAll()
+      }
+
+      val future = Future.sequence(futures)
+
+      Await.result(future, WaitTimeout).flatten.iterator
     } else {
       val vertices = ids.collect {
         case s2Vertex: S2VertexLike => s2Vertex
@@ -236,7 +240,13 @@ trait S2GraphLike extends Graph {
   def edges(edgeIds: AnyRef*): util.Iterator[structure.Edge] = {
     if (edgeIds.isEmpty) {
       // FIXME
-      Await.result(getEdgeBulkFetcher().fetchEdgesAll(), WaitTimeout).iterator
+      val futures = getAllEdgeFetchers().map { edgeFetcher =>
+        edgeFetcher.fetchEdgesAll()
+      }
+
+      val future = Future.sequence(futures)
+
+      Await.result(future, WaitTimeout).flatten.iterator
     } else {
       Await.result(edgesAsync(edgeIds: _*), WaitTimeout)
     }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/VertexBulkFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/VertexBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/VertexBulkFetcher.scala
deleted file mode 100644
index cbebab5..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/VertexBulkFetcher.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core
-
-import scala.concurrent.{ExecutionContext, Future}
-
-trait VertexBulkFetcher {
-  def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]]
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala
index 5c10d18..b641e7f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala
@@ -26,6 +26,10 @@ import scala.concurrent.{ExecutionContext, Future}
 
 trait VertexFetcher {
   def init(config: Config)(implicit ec: ExecutionContext): Unit = {}
+
   def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]]
+
+  def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]]
+
   def close(): Unit = {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala
index 18be890..d1c8ecf 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala
@@ -19,10 +19,15 @@
 
 package org.apache.s2graph.core
 
+import com.typesafe.config.Config
 import org.apache.s2graph.core.storage.MutateResponse
 
 import scala.concurrent.{ExecutionContext, Future}
 
 trait VertexMutator {
+  def close(): Unit = {}
+
+  def init(config: Config)(implicit ec: ExecutionContext): Unit = {}
+
   def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse]
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/fetcher/FetcherManager.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/FetcherManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/FetcherManager.scala
deleted file mode 100644
index 26db7ff..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/FetcherManager.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.fetcher
-
-import com.typesafe.config.Config
-import org.apache.s2graph.core.schema.Label
-import org.apache.s2graph.core.utils.{Importer, logger}
-import org.apache.s2graph.core.{EdgeFetcher, S2GraphLike}
-
-import scala.concurrent.{ExecutionContext, Future}
-
-object FetcherManager {
-  val ClassNameKey = "className"
-}
-
-class FetcherManager(s2GraphLike: S2GraphLike) {
-
-  import FetcherManager._
-
-  private val fetcherPool = scala.collection.mutable.Map.empty[String, EdgeFetcher]
-
-  private val ImportLock = new java.util.concurrent.ConcurrentHashMap[String, Importer]
-
-  def toImportLockKey(label: Label): String = label.label
-
-  def getFetcher(label: Label): EdgeFetcher = {
-    fetcherPool.getOrElse(toImportLockKey(label), throw new IllegalStateException(s"$label is not imported."))
-  }
-
-  def initImporter(config: Config): Importer = {
-    val className = config.getString(ClassNameKey)
-
-    Class.forName(className)
-      .getConstructor(classOf[S2GraphLike])
-      .newInstance(s2GraphLike)
-      .asInstanceOf[Importer]
-  }
-
-  def initFetcher(config: Config)(implicit ec: ExecutionContext): Future[EdgeFetcher] = {
-    val className = config.getString(ClassNameKey)
-
-    val fetcher = Class.forName(className)
-      .getConstructor(classOf[S2GraphLike])
-      .newInstance(s2GraphLike)
-      .asInstanceOf[EdgeFetcher]
-
-    fetcher.init(config)
-
-    Future.successful(fetcher)
-  }
-
-  def importModel(label: Label, config: Config)(implicit ec: ExecutionContext): Future[Importer] = {
-    val importer = ImportLock.computeIfAbsent(toImportLockKey(label), new java.util.function.Function[String, Importer] {
-      override def apply(k: String): Importer = {
-        val importer = initImporter(config.getConfig("importer"))
-
-        //TODO: Update Label's extra options.
-        importer
-          .run(config.getConfig("importer"))
-          .map { importer =>
-            logger.info(s"Close importer")
-            importer.close()
-
-            initFetcher(config.getConfig("fetcher")).map { fetcher =>
-              importer.setStatus(true)
-
-              fetcherPool
-                .remove(k)
-                .foreach { oldFetcher =>
-                  logger.info(s"Delete old storage ($k) => $oldFetcher")
-                  oldFetcher.close()
-                }
-
-              fetcherPool += (k -> fetcher)
-            }
-          }
-          .onComplete { _ =>
-            logger.info(s"ImportLock release: $k")
-            ImportLock.remove(k)
-          }
-
-        importer
-      }
-    })
-
-    Future.successful(importer)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala
index bf90d69..110d615 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala
@@ -37,18 +37,26 @@ class MemoryModelEdgeFetcher(val graph: S2GraphLike) extends EdgeFetcher {
   override def fetches(queryRequests: Seq[QueryRequest],
                        prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = {
     val stepResultLs = queryRequests.map { queryRequest =>
-      val queryParam = queryRequest.queryParam
-      val edges = ranges.map { ith =>
-        val tgtVertexId = builder.newVertexId(queryParam.label.service, queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), ith.toString)
+      toEdges(queryRequest)
+    }
 
-        graph.toEdge(queryRequest.vertex.innerIdVal,
-          tgtVertexId.innerId.value, queryParam.label.label, queryParam.direction)
-      }
+    Future.successful(stepResultLs)
+  }
 
-      val edgeWithScores = edges.map(e => EdgeWithScore(e, 1.0, queryParam.label))
-      StepResult(edgeWithScores, Nil, Nil)
+  override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = {
+    Future.successful(Nil)
+  }
+
+  private def toEdges(queryRequest: QueryRequest) = {
+    val queryParam = queryRequest.queryParam
+    val edges = ranges.map { ith =>
+      val tgtVertexId = builder.newVertexId(queryParam.label.service, queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), ith.toString)
+
+      graph.toEdge(queryRequest.vertex.innerIdVal,
+        tgtVertexId.innerId.value, queryParam.label.label, queryParam.direction)
     }
 
-    Future.successful(stepResultLs)
+    val edgeWithScores = edges.map(e => EdgeWithScore(e, 1.0, queryParam.label))
+    StepResult(edgeWithScores, Nil, Nil)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala b/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala
index 15f1231..948cdd8 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala
@@ -75,7 +75,8 @@ object Conversions {
       (JsPath \ "serviceId").read[Int] and
       (JsPath \ "columnName").read[String] and
       (JsPath \ "columnType").read[String] and
-      (JsPath \ "schemaVersion").read[String]
+      (JsPath \ "schemaVersion").read[String] and
+      (JsPath \ "options").readNullable[String]
     )(ServiceColumn.apply _)
 
   implicit val serviceColumnWrites: Writes[ServiceColumn] = (
@@ -83,7 +84,8 @@ object Conversions {
       (JsPath \ "serviceId").write[Int] and
       (JsPath \ "columnName").write[String] and
       (JsPath \ "columnType").write[String] and
-      (JsPath \ "schemaVersion").write[String]
+      (JsPath \ "schemaVersion").write[String] and
+      (JsPath \ "options").writeNullable[String]
     )(unlift(ServiceColumn.unapply))
 
   implicit val columnMetaReads: Reads[ColumnMeta] = (

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
index cca1769..f3ce5e0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
@@ -60,7 +60,9 @@ object Label extends SQLSyntaxSupport[Label] {
         select *
         from labels
         where label = ${labelName}
-        and deleted_at is null """.map { rs => Label(rs) }.single.apply()
+        and deleted_at is null """.map { rs =>
+          Label(rs)
+      }.single.apply()
 
     if (useCache) withCache(cacheKey)(labelOpt)
     else labelOpt
@@ -109,15 +111,17 @@ object Label extends SQLSyntaxSupport[Label] {
         .map { rs => Label(rs) }.single.apply())
   }
 
-  def findById(id: Int)(implicit session: DBSession = AutoSession): Label = {
+  def findById(id: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Label = {
     val cacheKey = className + "id=" + id
-    withCache(cacheKey)(
-      sql"""
+    lazy val sql = sql"""
         select 	*
         from 	labels
         where 	id = ${id}
         and deleted_at is null"""
-        .map { rs => Label(rs) }.single.apply()).get
+      .map { rs => Label(rs) }.single.apply()
+
+    if (useCache) withCache(cacheKey)(sql).get
+    else sql.get
   }
 
   def findByTgtColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
@@ -191,8 +195,8 @@ object Label extends SQLSyntaxSupport[Label] {
         val serviceId = service.id.get
 
         /** insert serviceColumn */
-        val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, Some(srcColumnType), schemaVersion)
-        val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, Some(tgtColumnType), schemaVersion)
+        val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, Some(srcColumnType), schemaVersion, None)
+        val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, Some(tgtColumnType), schemaVersion, None)
 
         if (srcCol.columnType != srcColumnType) throw new RuntimeException(s"source service column type not matched ${srcCol.columnType} != ${srcColumnType}")
         if (tgtCol.columnType != tgtColumnType) throw new RuntimeException(s"target service column type not matched ${tgtCol.columnType} != ${tgtColumnType}")
@@ -259,18 +263,18 @@ object Label extends SQLSyntaxSupport[Label] {
     cnt
   }
 
-  def updateOption(labelName: String, options: String)(implicit session: DBSession = AutoSession) = {
+  def updateOption(label: Label, options: String)(implicit session: DBSession = AutoSession) = {
     scala.util.Try(Json.parse(options)).getOrElse(throw new RuntimeException("invalid Json option"))
-    logger.info(s"update options of label $labelName, ${options}")
-    val cnt = sql"""update labels set options = $options where label = $labelName""".update().apply()
-    val label = Label.findByName(labelName, useCache = false).get
+    logger.info(s"update options of label ${label.label}, ${options}")
+    val cnt = sql"""update labels set options = $options where id = ${label.id.get}""".update().apply()
+    val updatedLabel = findById(label.id.get, useCache = false)
 
     val cacheKeys = List(s"id=${label.id.get}", s"label=${label.label}")
     cacheKeys.foreach { key =>
       expireCache(className + key)
       expireCaches(className + key)
     }
-    cnt
+    updatedLabel
   }
 
   def delete(id: Int)(implicit session: DBSession = AutoSession) = {
@@ -390,12 +394,14 @@ case class Label(id: Option[Int], label: String,
 
   lazy val storageConfigOpt: Option[Config] = toStorageConfig
 
-  lazy val fetchConfigExist: Boolean = toFetcherConfig.isDefined
-
   def toFetcherConfig: Option[Config] = {
     Schema.toConfig(extraOptions, "fetcher")
   }
 
+  def toMutatorConfig: Option[Config] = {
+    Schema.toConfig(extraOptions, "mutator")
+  }
+
   def toStorageConfig: Option[Config] = {
     Schema.toConfig(extraOptions, "storage")
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala
index cc1698a..61f1a09 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala
@@ -19,9 +19,11 @@
 
 package org.apache.s2graph.core.schema
 
+import com.typesafe.config.Config
 import org.apache.s2graph.core.JSONParser
 import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core.types.{HBaseType, InnerValLike, InnerValLikeWithTs}
+import org.apache.s2graph.core.utils.logger
 import play.api.libs.json.Json
 import scalikejdbc._
 
@@ -29,10 +31,11 @@ object ServiceColumn extends SQLSyntaxSupport[ServiceColumn] {
   import Schema._
   val className = ServiceColumn.getClass.getSimpleName
 
-  val Default = ServiceColumn(Option(0), -1, "default", "string", "v4")
+  val Default = ServiceColumn(Option(0), -1, "default", "string", "v4", None)
 
   def valueOf(rs: WrappedResultSet): ServiceColumn = {
-    ServiceColumn(rs.intOpt("id"), rs.int("service_id"), rs.string("column_name"), rs.string("column_type").toLowerCase(), rs.string("schema_version"))
+    ServiceColumn(rs.intOpt("id"), rs.int("service_id"), rs.string("column_name"),
+      rs.string("column_type").toLowerCase(), rs.string("schema_version"), rs.stringOpt("options"))
   }
 
   def findByServiceId(serviceId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Seq[ServiceColumn] = {
@@ -65,9 +68,9 @@ object ServiceColumn extends SQLSyntaxSupport[ServiceColumn] {
       """.map { rs => ServiceColumn.valueOf(rs) }.single.apply()
     }
   }
-  def insert(serviceId: Int, columnName: String, columnType: Option[String], schemaVersion: String)(implicit session: DBSession = AutoSession) = {
-    sql"""insert into service_columns(service_id, column_name, column_type, schema_version)
-         values(${serviceId}, ${columnName}, ${columnType}, ${schemaVersion})""".execute.apply()
+  def insert(serviceId: Int, columnName: String, columnType: Option[String], schemaVersion: String, options: Option[String])(implicit session: DBSession = AutoSession) = {
+    sql"""insert into service_columns(service_id, column_name, column_type, schema_version, options)
+         values(${serviceId}, ${columnName}, ${columnType}, ${schemaVersion}, ${options})""".execute.apply()
   }
   def delete(id: Int)(implicit session: DBSession = AutoSession) = {
     val serviceColumn = findById(id, useCache = false)
@@ -79,11 +82,14 @@ object ServiceColumn extends SQLSyntaxSupport[ServiceColumn] {
       expireCaches(className + key)
     }
   }
-  def findOrInsert(serviceId: Int, columnName: String, columnType: Option[String], schemaVersion: String = HBaseType.DEFAULT_VERSION, useCache: Boolean = true)(implicit session: DBSession = AutoSession): ServiceColumn = {
+  def findOrInsert(serviceId: Int, columnName: String, columnType: Option[String],
+                   schemaVersion: String = HBaseType.DEFAULT_VERSION,
+                   options: Option[String],
+                   useCache: Boolean = true)(implicit session: DBSession = AutoSession): ServiceColumn = {
     find(serviceId, columnName, useCache) match {
       case Some(sc) => sc
       case None =>
-        insert(serviceId, columnName, columnType, schemaVersion)
+        insert(serviceId, columnName, columnType, schemaVersion, options)
 //        val cacheKey = s"serviceId=$serviceId:columnName=$columnName"
         val cacheKey = "serviceId=" + serviceId + ":columnName=" + columnName
         expireCache(className + cacheKey)
@@ -101,12 +107,29 @@ object ServiceColumn extends SQLSyntaxSupport[ServiceColumn] {
 
     ls
   }
+  def updateOption(serviceColumn: ServiceColumn, options: String)(implicit session: DBSession = AutoSession) = {
+    scala.util.Try(Json.parse(options)).getOrElse(throw new RuntimeException("invalid Json option"))
+    logger.info(s"update options of service column ${serviceColumn.service.serviceName} ${serviceColumn.columnName}, ${options}")
+    val cnt = sql"""update service_columns set options = $options where id = ${serviceColumn.id.get}""".update().apply()
+    val column = findById(serviceColumn.id.get, useCache = false)
+
+    val cacheKeys = List(s"id=${column.id.get}",
+      s"serviceId=${serviceColumn.serviceId}:columnName=${serviceColumn.columnName}")
+
+    cacheKeys.foreach { key =>
+      expireCache(className + key)
+      expireCaches(className + key)
+    }
+
+    column
+  }
 }
 case class ServiceColumn(id: Option[Int],
                          serviceId: Int,
                          columnName: String,
                          columnType: String,
-                         schemaVersion: String)  {
+                         schemaVersion: String,
+                         options: Option[String])  {
 
   lazy val service = Service.findById(serviceId)
   lazy val metasWithoutCache = ColumnMeta.timestamp +: ColumnMeta.findAllByColumn(id.get, false) :+ ColumnMeta.lastModifiedAtColumn
@@ -147,5 +170,13 @@ case class ServiceColumn(id: Option[Int],
     }
   }
 
+  lazy val extraOptions = Schema.extraOptions(options)
 
+  def toFetcherConfig: Option[Config] = {
+    Schema.toConfig(extraOptions, "fetcher")
+  }
+
+  def toMutatorConfig: Option[Config] = {
+    Schema.toConfig(extraOptions, "mutator")
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticEdgeMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticEdgeMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticEdgeMutator.scala
new file mode 100644
index 0000000..4deecf5
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticEdgeMutator.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.s2graph.core.storage
+
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.schema.LabelMeta
+import org.apache.s2graph.core.utils.logger
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class DefaultOptimisticEdgeMutator(graph: S2GraphLike,
+                                   serDe: StorageSerDe,
+                                   optimisticEdgeFetcher: OptimisticEdgeFetcher,
+                                   optimisticMutator: OptimisticMutator,
+                                   io: StorageIO) extends EdgeMutator {
+  lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, optimisticMutator, optimisticEdgeFetcher)
+
+  private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
+    optimisticMutator.writeToStorage(cluster, kvs, withWait)
+
+  override def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
+                                    requestTs: Long,
+                                    retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = {
+    if (stepInnerResult.isEmpty) Future.successful(true)
+    else {
+      val head = stepInnerResult.edgeWithScores.head
+      val zkQuorum = head.edge.innerLabel.hbaseZkAddr
+      val futures = for {
+        edgeWithScore <- stepInnerResult.edgeWithScores
+      } yield {
+        val edge = edgeWithScore.edge
+
+        val edgeSnapshot = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
+        val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
+
+        val edgeForward = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
+        val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge =>
+          serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
+            io.buildIncrementsAsync(indexEdge, -1L)
+        }
+
+        /* reverted direction */
+        val edgeRevert = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
+        val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
+          serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
+            io.buildIncrementsAsync(indexEdge, -1L)
+        }
+
+        val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
+
+        writeToStorage(zkQuorum, mutations, withWait = true)
+      }
+
+      Future.sequence(futures).map { rets => rets.forall(_.isSuccess) }
+    }
+  }
+
+  override def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = {
+    val mutations = _edges.flatMap { edge =>
+      val (_, edgeUpdate) =
+        if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
+        else S2Edge.buildOperation(None, Seq(edge))
+
+      val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
+
+      if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
+      io.buildVertexPutsAsync(edge) ++ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
+    }
+
+    writeToStorage(zkQuorum, mutations, withWait).map { ret =>
+      _edges.zipWithIndex.map { case (edge, idx) =>
+        idx -> ret.isSuccess
+      }
+    }
+  }
+
+  override def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
+    def mutateEdgesInner(edges: Seq[S2EdgeLike],
+                         checkConsistency: Boolean,
+                         withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
+      assert(edges.nonEmpty)
+      // TODO:: remove after code review: unreachable code
+      if (!checkConsistency) {
+
+        val futures = edges.map { edge =>
+          val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))
+
+          val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
+          val mutations =
+            io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
+
+          if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
+
+          writeToStorage(zkQuorum, mutations, withWait)
+        }
+        Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) }
+      } else {
+        optimisticEdgeFetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
+          conflictResolver.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_))
+        }
+      }
+    }
+
+    val edgeWithIdxs = _edges.zipWithIndex
+    val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
+      (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId)
+    } toSeq
+
+    val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
+      val edges = edgeGroup.map(_._1)
+      val idxs = edgeGroup.map(_._2)
+      // After deleteAll, process others
+      val mutateEdgeFutures = edges.toList match {
+        case head :: tail =>
+          val edgeFuture = mutateEdgesInner(edges, checkConsistency = true, withWait)
+
+          //TODO: decide what we will do on failure on vertex put
+          val puts = io.buildVertexPutsAsync(head)
+          val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait)
+          Seq(edgeFuture, vertexFuture)
+        case Nil => Nil
+      }
+
+      val composed = for {
+        //        deleteRet <- Future.sequence(deleteAllFutures)
+        mutateRet <- Future.sequence(mutateEdgeFutures)
+      } yield mutateRet
+
+      composed.map(_.forall(_.isSuccess)).map { ret => idxs.map(idx => idx -> ret) }
+    }
+
+    Future.sequence(mutateEdges).map { squashedRets =>
+      squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2)
+    }
+  }
+
+  override def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = {
+    val futures = for {
+      edge <- edges
+    } yield {
+      val kvs = for {
+        relEdge <- edge.relatedEdges
+        edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid)
+      } yield {
+        val countWithTs = edge.propertyValueInner(LabelMeta.count)
+        val countVal = countWithTs.innerVal.toString().toLong
+        io.buildIncrementsCountAsync(edgeWithIndex, countVal).head
+      }
+      writeToStorage(zkQuorum, kvs, withWait = withWait)
+    }
+
+    Future.sequence(futures)
+  }
+
+  override def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = {
+    val kvs = io.buildDegreePuts(edge, degreeVal)
+
+    writeToStorage(zkQuorum, kvs, withWait = true)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala
deleted file mode 100644
index 82cc27a..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.s2graph.core.storage
-
-import org.apache.s2graph.core._
-import org.apache.s2graph.core.schema.LabelMeta
-import org.apache.s2graph.core.utils.logger
-
-import scala.concurrent.{ExecutionContext, Future}
-
-class DefaultOptimisticMutator(graph: S2GraphLike,
-                               serDe: StorageSerDe,
-                               optimisticEdgeFetcher: OptimisticEdgeFetcher,
-                               optimisticMutator: OptimisticMutator) extends VertexMutator with EdgeMutator {
-
-  lazy val io: StorageIO = new StorageIO(graph, serDe)
-
-  lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, optimisticMutator, optimisticEdgeFetcher)
-
-  private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
-    optimisticMutator.writeToStorage(cluster, kvs, withWait)
-
-  def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
-                                    requestTs: Long,
-                                    retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = {
-    if (stepInnerResult.isEmpty) Future.successful(true)
-    else {
-      val head = stepInnerResult.edgeWithScores.head
-      val zkQuorum = head.edge.innerLabel.hbaseZkAddr
-      val futures = for {
-        edgeWithScore <- stepInnerResult.edgeWithScores
-      } yield {
-        val edge = edgeWithScore.edge
-
-        val edgeSnapshot = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
-        val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
-
-        val edgeForward = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
-        val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge =>
-          serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
-            io.buildIncrementsAsync(indexEdge, -1L)
-        }
-
-        /* reverted direction */
-        val edgeRevert = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
-        val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
-          serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
-            io.buildIncrementsAsync(indexEdge, -1L)
-        }
-
-        val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
-
-        writeToStorage(zkQuorum, mutations, withWait = true)
-      }
-
-      Future.sequence(futures).map { rets => rets.forall(_.isSuccess) }
-    }
-  }
-
-  def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
-    if (vertex.op == GraphUtil.operations("delete")) {
-      writeToStorage(zkQuorum,
-        serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait)
-    } else if (vertex.op == GraphUtil.operations("deleteAll")) {
-      logger.info(s"deleteAll for vertex is truncated. $vertex")
-      Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time
-    } else {
-      writeToStorage(zkQuorum, io.buildPutsAll(vertex), withWait)
-    }
-  }
-
-  def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = {
-    val mutations = _edges.flatMap { edge =>
-      val (_, edgeUpdate) =
-        if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
-        else S2Edge.buildOperation(None, Seq(edge))
-
-      val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
-
-      if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
-      io.buildVertexPutsAsync(edge) ++ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
-    }
-
-    writeToStorage(zkQuorum, mutations, withWait).map { ret =>
-      _edges.zipWithIndex.map { case (edge, idx) =>
-        idx -> ret.isSuccess
-      }
-    }
-  }
-
-  def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
-    def mutateEdgesInner(edges: Seq[S2EdgeLike],
-                         checkConsistency: Boolean,
-                         withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
-      assert(edges.nonEmpty)
-      // TODO:: remove after code review: unreachable code
-      if (!checkConsistency) {
-
-        val futures = edges.map { edge =>
-          val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))
-
-          val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
-          val mutations =
-            io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
-
-          if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
-
-          writeToStorage(zkQuorum, mutations, withWait)
-        }
-        Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) }
-      } else {
-        optimisticEdgeFetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
-          conflictResolver.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_))
-        }
-      }
-    }
-
-    val edgeWithIdxs = _edges.zipWithIndex
-    val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
-      (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId)
-    } toSeq
-
-    val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
-      val edges = edgeGroup.map(_._1)
-      val idxs = edgeGroup.map(_._2)
-      // After deleteAll, process others
-      val mutateEdgeFutures = edges.toList match {
-        case head :: tail =>
-          val edgeFuture = mutateEdgesInner(edges, checkConsistency = true, withWait)
-
-          //TODO: decide what we will do on failure on vertex put
-          val puts = io.buildVertexPutsAsync(head)
-          val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait)
-          Seq(edgeFuture, vertexFuture)
-        case Nil => Nil
-      }
-
-      val composed = for {
-        //        deleteRet <- Future.sequence(deleteAllFutures)
-        mutateRet <- Future.sequence(mutateEdgeFutures)
-      } yield mutateRet
-
-      composed.map(_.forall(_.isSuccess)).map { ret => idxs.map(idx => idx -> ret) }
-    }
-
-    Future.sequence(mutateEdges).map { squashedRets =>
-      squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2)
-    }
-  }
-
-  def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = {
-    val futures = for {
-      edge <- edges
-    } yield {
-      val kvs = for {
-        relEdge <- edge.relatedEdges
-        edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid)
-      } yield {
-        val countWithTs = edge.propertyValueInner(LabelMeta.count)
-        val countVal = countWithTs.innerVal.toString().toLong
-        io.buildIncrementsCountAsync(edgeWithIndex, countVal).head
-      }
-      writeToStorage(zkQuorum, kvs, withWait = withWait)
-    }
-
-    Future.sequence(futures)
-  }
-
-  def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = {
-    val kvs = io.buildDegreePuts(edge, degreeVal)
-
-    writeToStorage(zkQuorum, kvs, withWait = true)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticVertexMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticVertexMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticVertexMutator.scala
new file mode 100644
index 0000000..6be619b
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticVertexMutator.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage
+
+import org.apache.s2graph.core.utils.logger
+import org.apache.s2graph.core.{GraphUtil, S2GraphLike, S2VertexLike, VertexMutator}
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class DefaultOptimisticVertexMutator(graph: S2GraphLike,
+                                     serDe: StorageSerDe,
+                                     optimisticEdgeFetcher: OptimisticEdgeFetcher,
+                                     optimisticMutator: OptimisticMutator,
+                                     io: StorageIO) extends VertexMutator {
+
+  def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
+    if (vertex.op == GraphUtil.operations("delete")) {
+      optimisticMutator.writeToStorage(zkQuorum,
+        serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait)
+    } else if (vertex.op == GraphUtil.operations("deleteAll")) {
+      logger.info(s"deleteAll for vertex is truncated. $vertex")
+      Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time
+    } else {
+      optimisticMutator.writeToStorage(zkQuorum, io.buildPutsAll(vertex), withWait)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index 36ecfcb..bf620bf 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -42,12 +42,8 @@ abstract class Storage(val graph: S2GraphLike,
 
   val edgeFetcher: EdgeFetcher
 
-  val edgeBulkFetcher: EdgeBulkFetcher
-
   val vertexFetcher: VertexFetcher
 
-  val vertexBulkFetcher: VertexBulkFetcher
-
   val edgeMutator: EdgeMutator
 
   val vertexMutator: VertexMutator

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeBulkFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeBulkFetcher.scala
deleted file mode 100644
index 3d25dd9..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeBulkFetcher.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage.hbase
-
-import java.util
-
-import com.typesafe.config.Config
-import org.apache.s2graph.core.schema.Label
-import org.apache.s2graph.core.storage.serde.Serializable
-import org.apache.s2graph.core.{EdgeBulkFetcher, S2EdgeLike, S2Graph, S2GraphLike}
-import org.apache.s2graph.core.storage.{CanSKeyValue, StorageIO, StorageSerDe}
-import org.apache.s2graph.core.types.HBaseType
-import org.apache.s2graph.core.utils.{CanDefer, Extensions}
-import org.hbase.async.{HBaseClient, KeyValue}
-
-import scala.concurrent.{ExecutionContext, Future}
-
-class AsynchbaseEdgeBulkFetcher(val graph: S2GraphLike,
-                                val config: Config,
-                                val client: HBaseClient,
-                                val serDe: StorageSerDe,
-                                val io: StorageIO) extends EdgeBulkFetcher {
-  import Extensions.DeferOps
-  import CanDefer._
-  import scala.collection.JavaConverters._
-  import AsynchbaseStorage._
-
-  override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = {
-    val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case (hTableName, labels) =>
-      val distinctLabels = labels.toSet
-      val scan = AsynchbasePatcher.newScanner(client, hTableName)
-      scan.setFamily(Serializable.edgeCf)
-      scan.setMaxVersions(1)
-
-      scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map {
-        case null => Seq.empty
-        case kvsLs =>
-          kvsLs.asScala.flatMap { kvs =>
-            kvs.asScala.flatMap { kv =>
-              val sKV = implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv)
-
-              serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION)
-                .fromKeyValues(Seq(kv), None)
-                .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree)
-            }
-          }
-      }
-    }
-
-    Future.sequence(futures).map(_.flatten)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala
index 4239d15..8eafc68 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala
@@ -25,12 +25,14 @@ import com.stumbleupon.async.Deferred
 import com.typesafe.config.Config
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.storage.{StorageIO, StorageSerDe}
+import org.apache.s2graph.core.schema.Label
+import org.apache.s2graph.core.storage.serde.Serializable
+import org.apache.s2graph.core.storage.{CanSKeyValue, StorageIO, StorageSerDe}
 import org.apache.s2graph.core.types.{HBaseType, VertexId}
 import org.apache.s2graph.core.utils.{CanDefer, DeferCache, Extensions, logger}
 import org.hbase.async._
 
-import scala.concurrent.ExecutionContext
+import scala.concurrent.{ExecutionContext, Future}
 
 class AsynchbaseEdgeFetcher(val graph: S2GraphLike,
                             val config: Config,
@@ -64,6 +66,31 @@ class AsynchbaseEdgeFetcher(val graph: S2GraphLike,
     }.toFuture(emptyStepResult).map(_.asScala)
   }
 
+  override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = {
+    val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case (hTableName, labels) =>
+      val distinctLabels = labels.toSet
+      val scan = AsynchbasePatcher.newScanner(client, hTableName)
+      scan.setFamily(Serializable.edgeCf)
+      scan.setMaxVersions(1)
+
+      scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map {
+        case null => Seq.empty
+        case kvsLs =>
+          kvsLs.asScala.flatMap { kvs =>
+            kvs.asScala.flatMap { kv =>
+              val sKV = implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv)
+
+              serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION)
+                .fromKeyValues(Seq(kv), None)
+                .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree)
+            }
+          }
+      }
+    }
+
+    Future.sequence(futures).map(_.flatten)
+  }
+
   /**
     * we are using future cache to squash requests into same key on storage.
     *

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index f65ee20..89303e6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -323,17 +323,14 @@ class AsynchbaseStorage(override val graph: S2GraphLike,
 
   private lazy val optimisticEdgeFetcher = new AsynchbaseOptimisticEdgeFetcher(client, serDe, io)
   private lazy val optimisticMutator = new AsynchbaseOptimisticMutator(graph, serDe, optimisticEdgeFetcher, client, clientWithFlush)
-  private lazy val _mutator = new DefaultOptimisticMutator(graph, serDe, optimisticEdgeFetcher, optimisticMutator)
 
   override val management: StorageManagement = new AsynchbaseStorageManagement(config, clients)
   override val serDe: StorageSerDe = new AsynchbaseStorageSerDe(graph)
 
   override val edgeFetcher: EdgeFetcher = new AsynchbaseEdgeFetcher(graph, config, client, serDe, io)
-  override val edgeBulkFetcher: EdgeBulkFetcher = new AsynchbaseEdgeBulkFetcher(graph, config, client, serDe, io)
   override val vertexFetcher: VertexFetcher = new AsynchbaseVertexFetcher(graph, config, client, serDe, io)
-  override val vertexBulkFetcher: VertexBulkFetcher = new AsynchbaseVertexBulkFetcher(graph, config, client, serDe, io)
 
-  override val edgeMutator: EdgeMutator = _mutator
-  override val vertexMutator: VertexMutator = _mutator
+  override val edgeMutator: EdgeMutator = new DefaultOptimisticEdgeMutator(graph, serDe, optimisticEdgeFetcher, optimisticMutator, io)
+  override val vertexMutator: VertexMutator = new DefaultOptimisticVertexMutator(graph, serDe, optimisticEdgeFetcher, optimisticMutator, io)
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexBulkFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexBulkFetcher.scala
deleted file mode 100644
index e6bf4e6..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexBulkFetcher.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage.hbase
-
-import com.typesafe.config.Config
-import org.apache.s2graph.core.schema.ServiceColumn
-import org.apache.s2graph.core.storage.serde.Serializable
-import org.apache.s2graph.core.storage.{StorageIO, StorageSerDe}
-import org.apache.s2graph.core.types.HBaseType
-import org.apache.s2graph.core.utils.Extensions
-import org.apache.s2graph.core.{S2Graph, S2GraphLike, VertexBulkFetcher}
-import org.hbase.async.HBaseClient
-
-import scala.concurrent.{ExecutionContext, Future}
-
-class AsynchbaseVertexBulkFetcher(val graph: S2GraphLike,
-                                  val config: Config,
-                                  val client: HBaseClient,
-                                  val serDe: StorageSerDe,
-                                  val io: StorageIO) extends VertexBulkFetcher {
-
-  import AsynchbaseStorage._
-  import Extensions.DeferOps
-
-  import scala.collection.JavaConverters._
-
-  override def fetchVerticesAll()(implicit ec: ExecutionContext) = {
-    val futures = ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case (hTableName, columns) =>
-      val distinctColumns = columns.toSet
-      val scan = AsynchbasePatcher.newScanner(client, hTableName)
-      scan.setFamily(Serializable.vertexCf)
-      scan.setMaxVersions(1)
-
-      scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map {
-        case null => Seq.empty
-        case kvsLs =>
-          kvsLs.asScala.flatMap { kvs =>
-            serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(kvs.asScala, None)
-              .filter(v => distinctColumns(v.serviceColumn))
-          }
-      }
-    }
-    Future.sequence(futures).map(_.flatten)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala
index 560dd2b..f16c8e9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala
@@ -21,7 +21,11 @@ package org.apache.s2graph.core.storage.hbase
 
 import com.typesafe.config.Config
 import org.apache.s2graph.core._
+import org.apache.s2graph.core.schema.ServiceColumn
+import org.apache.s2graph.core.storage.serde.Serializable
 import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe}
+import org.apache.s2graph.core.types.HBaseType
+import org.apache.s2graph.core.utils.Extensions
 import org.hbase.async.HBaseClient
 
 import scala.concurrent.{ExecutionContext, Future}
@@ -32,6 +36,9 @@ class AsynchbaseVertexFetcher(val graph: S2GraphLike,
                               val serDe: StorageSerDe,
                               val io: StorageIO) extends VertexFetcher  {
   import AsynchbaseStorage._
+  import Extensions.DeferOps
+  import scala.collection.JavaConverters._
+
 
   private def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = {
     val rpc = buildRequest(serDe, queryRequest, vertex)
@@ -58,4 +65,23 @@ class AsynchbaseVertexFetcher(val graph: S2GraphLike,
 
     Future.sequence(futures).map(_.flatten)
   }
+
+  override def fetchVerticesAll()(implicit ec: ExecutionContext) = {
+    val futures = ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case (hTableName, columns) =>
+      val distinctColumns = columns.toSet
+      val scan = AsynchbasePatcher.newScanner(client, hTableName)
+      scan.setFamily(Serializable.vertexCf)
+      scan.setMaxVersions(1)
+
+      scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map {
+        case null => Seq.empty
+        case kvsLs =>
+          kvsLs.asScala.flatMap { kvs =>
+            serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(kvs.asScala, None)
+              .filter(v => distinctColumns(v.serviceColumn))
+          }
+      }
+    }
+    Future.sequence(futures).map(_.flatten)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeBulkFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeBulkFetcher.scala
deleted file mode 100644
index 2ca4b35..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeBulkFetcher.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage.rocks
-
-import com.typesafe.config.Config
-import org.apache.s2graph.core.schema.Label
-import org.apache.s2graph.core.{EdgeBulkFetcher, S2EdgeLike, S2GraphLike}
-import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe}
-import org.apache.s2graph.core.types.HBaseType
-import org.rocksdb.RocksDB
-
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{ExecutionContext, Future}
-
-class RocksEdgeBulkFetcher(val graph: S2GraphLike,
-                           val config: Config,
-                           val db: RocksDB,
-                           val vdb: RocksDB,
-                           val serDe: StorageSerDe,
-                           val io: StorageIO) extends EdgeBulkFetcher  {
-  import RocksStorage._
-
-  override def fetchEdgesAll()(implicit ec: ExecutionContext) = {
-    val edges = new ArrayBuffer[S2EdgeLike]()
-    Label.findAll().groupBy(_.hbaseTableName).toSeq.foreach { case (hTableName, labels) =>
-      val distinctLabels = labels.toSet
-
-      val iter = db.newIterator()
-      try {
-        iter.seekToFirst()
-        while (iter.isValid) {
-          val kv = SKeyValue(table, iter.key(), SKeyValue.EdgeCf, qualifier, iter.value, System.currentTimeMillis())
-
-          serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(Seq(kv), None)
-            .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree)
-            .foreach { edge =>
-              edges += edge
-            }
-
-
-          iter.next()
-        }
-
-      } finally {
-        iter.close()
-      }
-    }
-
-    Future.successful(edges)
-  }
-}