You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/05/11 03:05:43 UTC

[03/11] incubator-s2graph git commit: separate Storage into multiple small interfaces such as EdgeFetcher/VertexMutator, ...

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala
new file mode 100644
index 0000000..4239d15
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.hbase
+
+import java.util
+
+import com.stumbleupon.async.Deferred
+import com.typesafe.config.Config
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.storage.{StorageIO, StorageSerDe}
+import org.apache.s2graph.core.types.{HBaseType, VertexId}
+import org.apache.s2graph.core.utils.{CanDefer, DeferCache, Extensions, logger}
+import org.hbase.async._
+
+import scala.concurrent.ExecutionContext
+
+class AsynchbaseEdgeFetcher(val graph: S2GraphLike,
+                            val config: Config,
+                            val client: HBaseClient,
+                            val serDe: StorageSerDe,
+                            val io: StorageIO) extends EdgeFetcher {
+
+  import AsynchbaseStorage._
+  import CanDefer._
+  import Extensions.DeferOps
+
+  import scala.collection.JavaConverters._
+
+  /** Future Cache to squash request */
+  lazy private val futureCache = new DeferCache[StepResult, Deferred, Deferred](config, StepResult.Empty, "AsyncHbaseFutureCache", useMetric = true)
+
+  override def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext) = {
+    val defers: Seq[Deferred[StepResult]] = for {
+      queryRequest <- queryRequests
+    } yield {
+      val queryOption = queryRequest.query.queryOption
+      val queryParam = queryRequest.queryParam
+      val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent
+      val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil
+      fetch(queryRequest, isInnerCall = false, parentEdges)
+    }
+
+    val grouped: Deferred[util.ArrayList[StepResult]] = Deferred.groupInOrder(defers.asJava)
+    grouped.map(emptyStepResult) { queryResults: util.ArrayList[StepResult] =>
+      queryResults
+    }.toFuture(emptyStepResult).map(_.asScala)
+  }
+
+  /**
+    * we are using future cache to squash requests into same key on storage.
+    *
+    * @param queryRequest
+    * @param isInnerCall
+    * @param parentEdges
+    * @return we use Deferred here since it has much better performrance compared to scala.concurrent.Future.
+    *         seems like map, flatMap on scala.concurrent.Future is slower than Deferred's addCallback
+    */
+  private def fetch(queryRequest: QueryRequest,
+                    isInnerCall: Boolean,
+                    parentEdges: Seq[EdgeWithScore])(implicit ec: ExecutionContext): Deferred[StepResult] = {
+
+    def fetchInner(hbaseRpc: AsyncRPC): Deferred[StepResult] = {
+      val prevStepScore = queryRequest.prevStepScore
+      val fallbackFn: (Exception => StepResult) = { ex =>
+        logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex)
+        StepResult.Failure
+      }
+
+      val queryParam = queryRequest.queryParam
+      AsynchbaseStorage.fetchKeyValuesInner(client, hbaseRpc).mapWithFallback(emptyKeyValues)(fallbackFn) { _kvs =>
+        val kvs = _kvs.asScala
+        val (startOffset, len) = queryParam.label.schemaVersion match {
+          case HBaseType.VERSION4 =>
+            val offset = if (queryParam.cursorOpt.isDefined) 0 else queryParam.offset
+            (offset, queryParam.limit)
+          case _ => (0, kvs.length)
+        }
+
+        io.toEdges(kvs, queryRequest, prevStepScore, isInnerCall, parentEdges, startOffset, len)
+      }
+    }
+
+    val queryParam = queryRequest.queryParam
+    val cacheTTL = queryParam.cacheTTLInMillis
+    /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */
+
+    val edge = graph.elementBuilder.toRequestEdge(queryRequest, parentEdges)
+    val request = buildRequest(client, serDe, queryRequest, edge)
+
+    val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge))
+    val requestCacheKey = Bytes.add(toCacheKeyBytes(request), intervalMaxBytes, intervalMinBytes)
+
+    if (cacheTTL <= 0) fetchInner(request)
+    else {
+      val cacheKeyBytes = Bytes.add(queryRequest.query.queryOption.cacheKeyBytes, requestCacheKey)
+
+      //      val cacheKeyBytes = toCacheKeyBytes(request)
+      val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
+      futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticEdgeFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticEdgeFetcher.scala
new file mode 100644
index 0000000..f03310c
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticEdgeFetcher.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.hbase
+
+import org.apache.s2graph.core.{QueryRequest, S2EdgeLike}
+import org.apache.s2graph.core.storage.{OptimisticEdgeFetcher, SKeyValue, StorageIO, StorageSerDe}
+import org.hbase.async.HBaseClient
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class AsynchbaseOptimisticEdgeFetcher(val client: HBaseClient,
+                                      val serDe: StorageSerDe,
+                                      val io: StorageIO) extends OptimisticEdgeFetcher {
+  override protected def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = {
+    val request = AsynchbaseStorage.buildRequest(client, serDe, queryRequest, edge)
+    AsynchbaseStorage.fetchKeyValues(client, request)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticMutator.scala
new file mode 100644
index 0000000..8305e04
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticMutator.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.hbase
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.S2GraphLike
+import org.apache.s2graph.core.storage._
+import org.apache.s2graph.core.utils.{Extensions, logger}
+import org.hbase.async.{AtomicIncrementRequest, DeleteRequest, HBaseClient, PutRequest}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+class AsynchbaseOptimisticMutator(val graph: S2GraphLike,
+                                  val serDe: StorageSerDe,
+                                  val optimisticEdgeFetcher: OptimisticEdgeFetcher,
+                                  val client: HBaseClient,
+                                  val clientWithFlush: HBaseClient) extends OptimisticMutator {
+  import Extensions.DeferOps
+
+  private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client
+  /**
+    * decide how to store given key values Seq[SKeyValue] into storage using storage's client.
+    * note that this should be return true on all success.
+    * we assumes that each storage implementation has client as member variable.
+    *
+    * @param cluster  : where this key values should be stored.
+    * @param kvs      : sequence of SKeyValue that need to be stored in storage.
+    * @param withWait : flag to control wait ack from storage.
+    *                 note that in AsynchbaseStorage(which support asynchronous operations), even with true,
+    *                 it never block thread, but rather submit work and notified by event loop when storage send ack back.
+    * @return ack message from storage.
+    */
+  override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext) = {
+    if (kvs.isEmpty) Future.successful(MutateResponse.Success)
+    else {
+      val _client = client(withWait)
+      val (increments, putAndDeletes) = kvs.partition(_.operation == SKeyValue.Increment)
+
+      /* Asynchbase IncrementRequest does not implement HasQualifiers */
+      val incrementsFutures = increments.map { kv =>
+        val countVal = Bytes.toLong(kv.value)
+        val request = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, countVal)
+        val fallbackFn: (Exception => MutateResponse) = { ex =>
+          logger.error(s"mutation failed. $request", ex)
+          new IncrementResponse(false, -1L, -1L)
+        }
+        val future = _client.bufferAtomicIncrement(request).mapWithFallback(0L)(fallbackFn) { resultCount: java.lang.Long =>
+          new IncrementResponse(true, resultCount.longValue(), countVal)
+        }.toFuture(MutateResponse.IncrementFailure)
+
+        if (withWait) future else Future.successful(MutateResponse.IncrementSuccess)
+      }
+
+      /* PutRequest and DeleteRequest accept byte[][] qualifiers/values. */
+      val othersFutures = putAndDeletes.groupBy { kv =>
+        (kv.table.toSeq, kv.row.toSeq, kv.cf.toSeq, kv.operation, kv.timestamp)
+      }.map { case ((table, row, cf, operation, timestamp), groupedKeyValues) =>
+
+        val durability = groupedKeyValues.head.durability
+        val qualifiers = new ArrayBuffer[Array[Byte]]()
+        val values = new ArrayBuffer[Array[Byte]]()
+
+        groupedKeyValues.foreach { kv =>
+          if (kv.qualifier != null) qualifiers += kv.qualifier
+          if (kv.value != null) values += kv.value
+        }
+        val defer = operation match {
+          case SKeyValue.Put =>
+            val put = new PutRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, values.toArray, timestamp)
+            put.setDurable(durability)
+            _client.put(put)
+          case SKeyValue.Delete =>
+            val delete =
+              if (qualifiers.isEmpty)
+                new DeleteRequest(table.toArray, row.toArray, cf.toArray, timestamp)
+              else
+                new DeleteRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, timestamp)
+            delete.setDurable(durability)
+            _client.delete(delete)
+        }
+        if (withWait) {
+          defer.toFuture(new AnyRef()).map(_ => MutateResponse.Success).recover { case ex: Exception =>
+            groupedKeyValues.foreach { kv => logger.error(s"mutation failed. $kv", ex) }
+            MutateResponse.Failure
+          }
+        } else Future.successful(MutateResponse.Success)
+      }
+      for {
+        incrementRets <- Future.sequence(incrementsFutures)
+        otherRets <- Future.sequence(othersFutures)
+      } yield new MutateResponse(isSuccess = (incrementRets ++ otherRets).forall(_.isSuccess))
+    }
+  }
+
+  /**
+    * write requestKeyValue into storage if the current value in storage that is stored matches.
+    * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge.
+    *
+    * Most important thing is this have to be 'atomic' operation.
+    * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be
+    * either blocked or failed on write-write conflict case.
+    *
+    * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to
+    * prevent wrong data for read.
+    *
+    * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction,
+    * compareAndSet to synchronize.
+    *
+    * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'.
+    * for storage that does not support concurrency control, then storage implementation
+    * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues)
+    * and write(writeLock).
+    *
+    * @param rpc
+    * @param expectedOpt
+    * @return
+    */
+  override def writeLock(rpc: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse] = {
+    val put = new PutRequest(rpc.table, rpc.row, rpc.cf, rpc.qualifier, rpc.value, rpc.timestamp)
+    val expected = expectedOpt.map(_.value).getOrElse(Array.empty)
+    client(withWait = true).compareAndSet(put, expected).map(true.booleanValue())(ret => ret.booleanValue()).toFuture(true)
+      .map(r => new MutateResponse(isSuccess = r))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 4be3767..f65ee20 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -21,25 +21,40 @@ package org.apache.s2graph.core.storage.hbase
 
 
 import java.util
+import java.util.Base64
 import java.util.concurrent.{ExecutorService, Executors}
 
+import com.stumbleupon.async.Deferred
 import com.typesafe.config.Config
 import org.apache.commons.io.FileUtils
+import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.storage._
+import org.apache.s2graph.core.storage.serde._
+import org.apache.s2graph.core.types.{HBaseType, VertexId}
 import org.apache.s2graph.core.utils._
+import org.hbase.async.FilterList.Operator.MUST_PASS_ALL
 import org.hbase.async._
-import org.apache.s2graph.core.storage.serde._
+
 import scala.collection.JavaConversions._
+import scala.concurrent.{ExecutionContext, Future}
 
 
 object AsynchbaseStorage {
+  import Extensions.DeferOps
+  import CanDefer._
+
   val vertexCf = Serializable.vertexCf
   val edgeCf = Serializable.edgeCf
+
   val emptyKVs = new util.ArrayList[KeyValue]()
+  val emptyKeyValues = new util.ArrayList[KeyValue]()
+  val emptyKeyValuesLs = new util.ArrayList[util.ArrayList[KeyValue]]()
+  val emptyStepResult = new util.ArrayList[StepResult]()
 
   AsynchbasePatcher.init()
 
+
   def makeClient(config: Config, overrideKv: (String, String)*) = {
     val asyncConfig: org.hbase.async.Config =
       if (config.hasPath("hbase.security.auth.enable") && config.getBoolean("hbase.security.auth.enable")) {
@@ -135,6 +150,161 @@ object AsynchbaseStorage {
 
     hbaseExecutor
   }
+
+  def fetchKeyValues(client: HBaseClient, rpc: AsyncRPC)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = {
+    val defer = fetchKeyValuesInner(client, rpc)
+    defer.toFuture(emptyKeyValues).map { kvsArr =>
+      kvsArr.map { kv =>
+        implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv)
+      }
+    }
+  }
+
+  def fetchKeyValuesInner(client: HBaseClient, rpc: AsyncRPC)(implicit ec: ExecutionContext): Deferred[util.ArrayList[KeyValue]] = {
+    rpc match {
+      case Left(get) => client.get(get)
+      case Right(ScanWithRange(scanner, offset, limit)) =>
+        val fallbackFn: (Exception => util.ArrayList[KeyValue]) = { ex =>
+          logger.error(s"fetchKeyValuesInner failed.", ex)
+          scanner.close()
+          emptyKeyValues
+        }
+
+        scanner.nextRows().mapWithFallback(new util.ArrayList[util.ArrayList[KeyValue]]())(fallbackFn) { kvsLs =>
+          val ls = new util.ArrayList[KeyValue]
+          if (kvsLs == null) {
+          } else {
+            kvsLs.foreach { kvs =>
+              if (kvs != null) kvs.foreach { kv => ls.add(kv) }
+              else {
+
+              }
+            }
+          }
+
+          scanner.close()
+          val toIndex = Math.min(ls.size(), offset + limit)
+          new util.ArrayList[KeyValue](ls.subList(offset, toIndex))
+        }
+      case _ => Deferred.fromError(new RuntimeException(s"fetchKeyValues failed. $rpc"))
+    }
+  }
+
+  def toCacheKeyBytes(hbaseRpc: AsyncRPC): Array[Byte] = {
+    /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */
+    hbaseRpc match {
+      case Left(getRequest) => getRequest.key
+      case Right(ScanWithRange(scanner, offset, limit)) =>
+        Bytes.add(scanner.getCurrentKey, Bytes.add(Bytes.toBytes(offset), Bytes.toBytes(limit)))
+      case _ =>
+        logger.error(s"toCacheKeyBytes failed. not supported class type. $hbaseRpc")
+        throw new RuntimeException(s"toCacheKeyBytes: $hbaseRpc")
+    }
+  }
+
+  def buildRequest(serDe: StorageSerDe, queryRequest: QueryRequest, vertex: S2VertexLike) = {
+    val kvs = serDe.vertexSerializer(vertex).toKeyValues
+    val get = new GetRequest(vertex.hbaseTableName.getBytes, kvs.head.row, Serializable.vertexCf)
+    //      get.setTimeout(this.singleGetTimeout.toShort)
+    get.setFailfast(true)
+    get.maxVersions(1)
+
+    Left(get)
+  }
+
+  def buildRequest(client: HBaseClient, serDe: StorageSerDe, queryRequest: QueryRequest, edge: S2EdgeLike) = {
+    val queryParam = queryRequest.queryParam
+    val label = queryParam.label
+
+    val serializer = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
+      val snapshotEdge = edge.toSnapshotEdge
+      serDe.snapshotEdgeSerializer(snapshotEdge)
+    } else {
+      val indexEdge = edge.toIndexEdge(queryParam.labelOrderSeq)
+      serDe.indexEdgeSerializer(indexEdge)
+    }
+
+    val rowKey = serializer.toRowKey
+    val (minTs, maxTs) = queryParam.durationOpt.getOrElse((0L, Long.MaxValue))
+
+    val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge))
+
+    label.schemaVersion match {
+      case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty =>
+        val scanner = AsynchbasePatcher.newScanner(client, label.hbaseTableName)
+        scanner.setFamily(SKeyValue.EdgeCf)
+
+        /*
+         * TODO: remove this part.
+         */
+        val indexEdgeOpt = edge.edgesWithIndex.find(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq)
+        val indexEdge = indexEdgeOpt.getOrElse(throw new RuntimeException(s"Can`t find index for query $queryParam"))
+
+        val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
+        val labelWithDirBytes = indexEdge.labelWithDir.bytes
+        val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
+        val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
+
+        val (startKey, stopKey) =
+          if (queryParam.intervalOpt.isDefined) {
+            // interval is set.
+            val _startKey = queryParam.cursorOpt match {
+              case Some(cursor) => Base64.getDecoder.decode(cursor)
+              case None => Bytes.add(baseKey, intervalMaxBytes)
+            }
+            (_startKey , Bytes.add(baseKey, intervalMinBytes))
+          } else {
+            /*
+             * note: since propsToBytes encode size of property map at first byte, we are sure about max value here
+             */
+            val _startKey = queryParam.cursorOpt match {
+              case Some(cursor) => Base64.getDecoder.decode(cursor)
+              case None => baseKey
+            }
+            (_startKey, Bytes.add(baseKey, Array.fill(1)(-1)))
+          }
+
+        scanner.setStartKey(startKey)
+        scanner.setStopKey(stopKey)
+
+        if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: $queryParam")
+
+        scanner.setMaxVersions(1)
+        // TODO: exclusive condition innerOffset with cursorOpt
+        if (queryParam.cursorOpt.isDefined) {
+          scanner.setMaxNumRows(queryParam.limit)
+        } else {
+          scanner.setMaxNumRows(queryParam.innerOffset + queryParam.innerLimit)
+        }
+        scanner.setMaxTimestamp(maxTs)
+        scanner.setMinTimestamp(minTs)
+        scanner.setRpcTimeout(queryParam.rpcTimeout)
+
+        // SET option for this rpc properly.
+        if (queryParam.cursorOpt.isDefined) Right(ScanWithRange(scanner, 0, queryParam.limit))
+        else Right(ScanWithRange(scanner, 0, queryParam.innerOffset + queryParam.innerLimit))
+
+      case _ =>
+        val get = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
+          new GetRequest(label.hbaseTableName.getBytes, rowKey, SKeyValue.EdgeCf, serializer.toQualifier)
+        } else {
+          new GetRequest(label.hbaseTableName.getBytes, rowKey, SKeyValue.EdgeCf)
+        }
+
+        get.maxVersions(1)
+        get.setFailfast(true)
+        get.setMinTimestamp(minTs)
+        get.setMaxTimestamp(maxTs)
+        get.setTimeout(queryParam.rpcTimeout)
+
+        val pagination = new ColumnPaginationFilter(queryParam.limit, queryParam.offset)
+        val columnRangeFilterOpt = queryParam.intervalOpt.map { interval =>
+          new ColumnRangeFilter(intervalMaxBytes, true, intervalMinBytes, true)
+        }
+        get.setFilter(new FilterList(pagination +: columnRangeFilterOpt.toSeq, MUST_PASS_ALL))
+        Left(get)
+    }
+  }
 }
 
 
@@ -149,11 +319,21 @@ class AsynchbaseStorage(override val graph: S2GraphLike,
   val clientWithFlush = AsynchbaseStorage.makeClient(config, "hbase.rpcs.buffered_flush_interval" -> "0")
   val clients = Seq(client, clientWithFlush)
 
-  override val management: StorageManagement = new AsynchbaseStorageManagement(config, clients)
+//  private lazy val _fetcher = new AsynchbaseStorageFetcher(graph, config, client, serDe, io)
+
+  private lazy val optimisticEdgeFetcher = new AsynchbaseOptimisticEdgeFetcher(client, serDe, io)
+  private lazy val optimisticMutator = new AsynchbaseOptimisticMutator(graph, serDe, optimisticEdgeFetcher, client, clientWithFlush)
+  private lazy val _mutator = new DefaultOptimisticMutator(graph, serDe, optimisticEdgeFetcher, optimisticMutator)
 
+  override val management: StorageManagement = new AsynchbaseStorageManagement(config, clients)
   override val serDe: StorageSerDe = new AsynchbaseStorageSerDe(graph)
 
-  override val reader: StorageReadable = new AsynchbaseStorageReadable(graph, config, client, serDe, io)
+  override val edgeFetcher: EdgeFetcher = new AsynchbaseEdgeFetcher(graph, config, client, serDe, io)
+  override val edgeBulkFetcher: EdgeBulkFetcher = new AsynchbaseEdgeBulkFetcher(graph, config, client, serDe, io)
+  override val vertexFetcher: VertexFetcher = new AsynchbaseVertexFetcher(graph, config, client, serDe, io)
+  override val vertexBulkFetcher: VertexBulkFetcher = new AsynchbaseVertexBulkFetcher(graph, config, client, serDe, io)
+
+  override val edgeMutator: EdgeMutator = _mutator
+  override val vertexMutator: VertexMutator = _mutator
 
-  override val mutator: Mutator = new AsynchbaseStorageWritable(graph, serDe, reader, client, clientWithFlush)
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
deleted file mode 100644
index 0526042..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- *
- */
-
-package org.apache.s2graph.core.storage.hbase
-
-import java.util
-import java.util.Base64
-
-import com.stumbleupon.async.Deferred
-import com.typesafe.config.Config
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core._
-import org.apache.s2graph.core.schema.{Label, ServiceColumn}
-import org.apache.s2graph.core.storage._
-import org.apache.s2graph.core.storage.serde._
-import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.{AsyncRPC, ScanWithRange}
-import org.apache.s2graph.core.types.{HBaseType, VertexId}
-import org.apache.s2graph.core.utils.{CanDefer, DeferCache, Extensions, logger}
-import org.hbase.async.FilterList.Operator.MUST_PASS_ALL
-import org.hbase.async._
-
-import scala.collection.JavaConversions._
-import scala.concurrent.{ExecutionContext, Future}
-
-class AsynchbaseStorageReadable(val graph: S2GraphLike,
-                                val config: Config,
-                                val client: HBaseClient,
-                                override val serDe: StorageSerDe,
-                                override val io: StorageIO) extends StorageReadable {
-  import Extensions.DeferOps
-  import CanDefer._
-
-  private val emptyKeyValues = new util.ArrayList[KeyValue]()
-  private val emptyKeyValuesLs = new util.ArrayList[util.ArrayList[KeyValue]]()
-  private val emptyStepResult = new util.ArrayList[StepResult]()
-
-  /** Future Cache to squash request */
-  lazy private val futureCache = new DeferCache[StepResult, Deferred, Deferred](config, StepResult.Empty, "AsyncHbaseFutureCache", useMetric = true)
-  /** v4 max next row size */
-  private val v4_max_num_rows = 10000
-  private def getV4MaxNumRows(limit : Int): Int = {
-    if (limit < v4_max_num_rows) limit
-    else v4_max_num_rows
-  }
-
-  /**
-    * build proper request which is specific into storage to call fetchIndexEdgeKeyValues or fetchSnapshotEdgeKeyValues.
-    * for example, Asynchbase use GetRequest, Scanner so this method is responsible to build
-    * client request(GetRequest, Scanner) based on user provided query.
-    *
-    * @param queryRequest
-    * @return
-    */
-  private def buildRequest(queryRequest: QueryRequest, edge: S2EdgeLike) = {
-    import Serializable._
-    val queryParam = queryRequest.queryParam
-    val label = queryParam.label
-
-    val serializer = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
-      val snapshotEdge = edge.toSnapshotEdge
-      serDe.snapshotEdgeSerializer(snapshotEdge)
-    } else {
-      val indexEdge = edge.toIndexEdge(queryParam.labelOrderSeq)
-      serDe.indexEdgeSerializer(indexEdge)
-    }
-
-    val rowKey = serializer.toRowKey
-    val (minTs, maxTs) = queryParam.durationOpt.getOrElse((0L, Long.MaxValue))
-
-    val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge))
-
-    label.schemaVersion match {
-      case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty =>
-        val scanner = AsynchbasePatcher.newScanner(client, label.hbaseTableName)
-        scanner.setFamily(edgeCf)
-
-        /*
-         * TODO: remove this part.
-         */
-        val indexEdgeOpt = edge.edgesWithIndex.find(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq)
-        val indexEdge = indexEdgeOpt.getOrElse(throw new RuntimeException(s"Can`t find index for query $queryParam"))
-
-        val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
-        val labelWithDirBytes = indexEdge.labelWithDir.bytes
-        val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
-        val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
-
-        val (startKey, stopKey) =
-          if (queryParam.intervalOpt.isDefined) {
-            // interval is set.
-            val _startKey = queryParam.cursorOpt match {
-              case Some(cursor) => Base64.getDecoder.decode(cursor)
-              case None => Bytes.add(baseKey, intervalMaxBytes)
-            }
-            (_startKey , Bytes.add(baseKey, intervalMinBytes))
-          } else {
-            /*
-             * note: since propsToBytes encode size of property map at first byte, we are sure about max value here
-             */
-            val _startKey = queryParam.cursorOpt match {
-              case Some(cursor) => Base64.getDecoder.decode(cursor)
-              case None => baseKey
-            }
-            (_startKey, Bytes.add(baseKey, Array.fill(1)(-1)))
-          }
-
-        scanner.setStartKey(startKey)
-        scanner.setStopKey(stopKey)
-
-        if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: $queryParam")
-
-        scanner.setMaxVersions(1)
-        // TODO: exclusive condition innerOffset with cursorOpt
-        if (queryParam.cursorOpt.isDefined) {
-          scanner.setMaxNumRows(getV4MaxNumRows(queryParam.limit))
-        } else {
-          scanner.setMaxNumRows(getV4MaxNumRows(queryParam.innerOffset + queryParam.innerLimit))
-        }
-        scanner.setMaxTimestamp(maxTs)
-        scanner.setMinTimestamp(minTs)
-        scanner.setRpcTimeout(queryParam.rpcTimeout)
-
-        // SET option for this rpc properly.
-        if (queryParam.cursorOpt.isDefined) Right(ScanWithRange(scanner, 0, queryParam.limit))
-        else Right(ScanWithRange(scanner, 0, queryParam.innerOffset + queryParam.innerLimit))
-
-      case _ =>
-        val get = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
-          new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf, serializer.toQualifier)
-        } else {
-          new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf)
-        }
-
-        get.maxVersions(1)
-        get.setFailfast(true)
-        get.setMinTimestamp(minTs)
-        get.setMaxTimestamp(maxTs)
-        get.setTimeout(queryParam.rpcTimeout)
-
-        val pagination = new ColumnPaginationFilter(queryParam.limit, queryParam.offset)
-        val columnRangeFilterOpt = queryParam.intervalOpt.map { interval =>
-          new ColumnRangeFilter(intervalMaxBytes, true, intervalMinBytes, true)
-        }
-        get.setFilter(new FilterList(pagination +: columnRangeFilterOpt.toSeq, MUST_PASS_ALL))
-        Left(get)
-    }
-  }
-
-  /**
-    *
-    * @param queryRequest
-    * @param vertex
-    * @return
-    */
-  private def buildRequest(queryRequest: QueryRequest, vertex: S2VertexLike) = {
-    val kvs = serDe.vertexSerializer(vertex).toKeyValues
-    val get = new GetRequest(vertex.hbaseTableName.getBytes, kvs.head.row, Serializable.vertexCf)
-    //      get.setTimeout(this.singleGetTimeout.toShort)
-    get.setFailfast(true)
-    get.maxVersions(1)
-
-    Left(get)
-  }
-
-  override def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext) = {
-    val rpc = buildRequest(queryRequest, edge)
-    fetchKeyValues(rpc)
-  }
-
-  override def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext) = {
-    val rpc = buildRequest(queryRequest, vertex)
-    fetchKeyValues(rpc)
-  }
-
-  /**
-    * responsible to fire parallel fetch call into storage and create future that will return merged result.
-    *
-    * @param queryRequests
-    * @param prevStepEdges
-    * @return
-    */
-  override def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext) = {
-    val defers: Seq[Deferred[StepResult]] = for {
-      queryRequest <- queryRequests
-    } yield {
-      val queryOption = queryRequest.query.queryOption
-      val queryParam = queryRequest.queryParam
-      val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent
-      val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil
-      fetch(queryRequest, isInnerCall = false, parentEdges)
-    }
-
-    val grouped: Deferred[util.ArrayList[StepResult]] = Deferred.groupInOrder(defers)
-    grouped.map(emptyStepResult) { queryResults: util.ArrayList[StepResult] =>
-      queryResults.toSeq
-    }.toFuture(emptyStepResult)
-  }
-
-  def fetchKeyValues(rpc: AsyncRPC)(implicit ec: ExecutionContext) = {
-    val defer = fetchKeyValuesInner(rpc)
-    defer.toFuture(emptyKeyValues).map { kvsArr =>
-      kvsArr.map { kv =>
-        implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv)
-      }
-    }
-  }
-
-  override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = {
-    val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case (hTableName, labels) =>
-      val distinctLabels = labels.toSet
-      val scan = AsynchbasePatcher.newScanner(client, hTableName)
-      scan.setFamily(Serializable.edgeCf)
-      scan.setMaxVersions(1)
-
-      scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map {
-        case null => Seq.empty
-        case kvsLs =>
-          kvsLs.flatMap { kvs =>
-            kvs.flatMap { kv =>
-              val sKV = implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv)
-
-              serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION)
-                .fromKeyValues(Seq(kv), None)
-                .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree)
-            }
-          }
-      }
-    }
-
-    Future.sequence(futures).map(_.flatten)
-  }
-
-  override def fetchVerticesAll()(implicit ec: ExecutionContext) = {
-    val futures = ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case (hTableName, columns) =>
-      val distinctColumns = columns.toSet
-      val scan = AsynchbasePatcher.newScanner(client, hTableName)
-      scan.setFamily(Serializable.vertexCf)
-      scan.setMaxVersions(1)
-
-      scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map {
-        case null => Seq.empty
-        case kvsLs =>
-          kvsLs.flatMap { kvs =>
-            serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(kvs, None)
-              .filter(v => distinctColumns(v.serviceColumn))
-          }
-      }
-    }
-    Future.sequence(futures).map(_.flatten)
-  }
-
-
-  /**
-    * we are using future cache to squash requests into same key on storage.
-    *
-    * @param queryRequest
-    * @param isInnerCall
-    * @param parentEdges
-    * @return we use Deferred here since it has much better performrance compared to scala.concurrent.Future.
-    *         seems like map, flatMap on scala.concurrent.Future is slower than Deferred's addCallback
-    */
-  private def fetch(queryRequest: QueryRequest,
-                    isInnerCall: Boolean,
-                    parentEdges: Seq[EdgeWithScore])(implicit ec: ExecutionContext): Deferred[StepResult] = {
-
-    def fetchInner(hbaseRpc: AsyncRPC): Deferred[StepResult] = {
-      val prevStepScore = queryRequest.prevStepScore
-      val fallbackFn: (Exception => StepResult) = { ex =>
-        logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex)
-        StepResult.Failure
-      }
-
-      val queryParam = queryRequest.queryParam
-      fetchKeyValuesInner(hbaseRpc).mapWithFallback(emptyKeyValues)(fallbackFn) { kvs =>
-        val (startOffset, len) = queryParam.label.schemaVersion match {
-          case HBaseType.VERSION4 =>
-            val offset = if (queryParam.cursorOpt.isDefined) 0 else queryParam.offset
-            (offset, queryParam.limit)
-          case _ => (0, kvs.length)
-        }
-
-        io.toEdges(kvs, queryRequest, prevStepScore, isInnerCall, parentEdges, startOffset, len)
-      }
-    }
-
-    val queryParam = queryRequest.queryParam
-    val cacheTTL = queryParam.cacheTTLInMillis
-    /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */
-
-    val edge = graph.elementBuilder.toRequestEdge(queryRequest, parentEdges)
-    val request = buildRequest(queryRequest, edge)
-
-    val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge))
-    val requestCacheKey = Bytes.add(toCacheKeyBytes(request), intervalMaxBytes, intervalMinBytes)
-
-    if (cacheTTL <= 0) fetchInner(request)
-    else {
-      val cacheKeyBytes = Bytes.add(queryRequest.query.queryOption.cacheKeyBytes, requestCacheKey)
-
-      //      val cacheKeyBytes = toCacheKeyBytes(request)
-      val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
-      futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
-    }
-  }
-
-  /**
-    * Private Methods which is specific to Asynchbase implementation.
-    */
-  private def fetchKeyValuesInner(rpc: AsyncRPC)(implicit ec: ExecutionContext): Deferred[util.ArrayList[KeyValue]] = {
-    rpc match {
-      case Left(get) => client.get(get)
-      case Right(ScanWithRange(scanner, offset, limit)) =>
-        val fallbackFn: (Exception => util.ArrayList[KeyValue]) = { ex =>
-          logger.error(s"fetchKeyValuesInner failed.", ex)
-          scanner.close()
-          emptyKeyValues
-        }
-
-        scanner.nextRows().mapWithFallback(new util.ArrayList[util.ArrayList[KeyValue]]())(fallbackFn) { kvsLs =>
-          val ls = new util.ArrayList[KeyValue]
-          if (kvsLs == null) {
-          } else {
-            kvsLs.foreach { kvs =>
-              if (kvs != null) kvs.foreach { kv => ls.add(kv) }
-              else {
-
-              }
-            }
-          }
-
-          scanner.close()
-          val toIndex = Math.min(ls.size(), offset + limit)
-          new util.ArrayList[KeyValue](ls.subList(offset, toIndex))
-        }
-      case _ => Deferred.fromError(new RuntimeException(s"fetchKeyValues failed. $rpc"))
-    }
-  }
-
-  private def toCacheKeyBytes(hbaseRpc: AsyncRPC): Array[Byte] = {
-    /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */
-    hbaseRpc match {
-      case Left(getRequest) => getRequest.key
-      case Right(ScanWithRange(scanner, offset, limit)) =>
-        Bytes.add(scanner.getCurrentKey, Bytes.add(Bytes.toBytes(offset), Bytes.toBytes(limit)))
-      case _ =>
-        logger.error(s"toCacheKeyBytes failed. not supported class type. $hbaseRpc")
-        throw new RuntimeException(s"toCacheKeyBytes: $hbaseRpc")
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala
deleted file mode 100644
index b4236b9..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage.hbase
-
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.S2GraphLike
-import org.apache.s2graph.core.storage._
-import org.apache.s2graph.core.utils.{Extensions, logger}
-import org.hbase.async.{AtomicIncrementRequest, DeleteRequest, HBaseClient, PutRequest}
-
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{ExecutionContext, Future}
-
-class AsynchbaseStorageWritable(val graph: S2GraphLike,
-                                val serDe: StorageSerDe,
-                                val reader: StorageReadable,
-                                val client: HBaseClient,
-                                val clientWithFlush: HBaseClient) extends DefaultOptimisticMutator(graph, serDe, reader) {
-  import Extensions.DeferOps
-
-  private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client
-  /**
-    * decide how to store given key values Seq[SKeyValue] into storage using storage's client.
-    * note that this should be return true on all success.
-    * we assumes that each storage implementation has client as member variable.
-    *
-    * @param cluster  : where this key values should be stored.
-    * @param kvs      : sequence of SKeyValue that need to be stored in storage.
-    * @param withWait : flag to control wait ack from storage.
-    *                 note that in AsynchbaseStorage(which support asynchronous operations), even with true,
-    *                 it never block thread, but rather submit work and notified by event loop when storage send ack back.
-    * @return ack message from storage.
-    */
-  override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext) = {
-    if (kvs.isEmpty) Future.successful(MutateResponse.Success)
-    else {
-      val _client = client(withWait)
-      val (increments, putAndDeletes) = kvs.partition(_.operation == SKeyValue.Increment)
-
-      /* Asynchbase IncrementRequest does not implement HasQualifiers */
-      val incrementsFutures = increments.map { kv =>
-        val countVal = Bytes.toLong(kv.value)
-        val request = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, countVal)
-        val fallbackFn: (Exception => MutateResponse) = { ex =>
-          logger.error(s"mutation failed. $request", ex)
-          new IncrementResponse(false, -1L, -1L)
-        }
-        val future = _client.bufferAtomicIncrement(request).mapWithFallback(0L)(fallbackFn) { resultCount: java.lang.Long =>
-          new IncrementResponse(true, resultCount.longValue(), countVal)
-        }.toFuture(MutateResponse.IncrementFailure)
-
-        if (withWait) future else Future.successful(MutateResponse.IncrementSuccess)
-      }
-
-      /* PutRequest and DeleteRequest accept byte[][] qualifiers/values. */
-      val othersFutures = putAndDeletes.groupBy { kv =>
-        (kv.table.toSeq, kv.row.toSeq, kv.cf.toSeq, kv.operation, kv.timestamp)
-      }.map { case ((table, row, cf, operation, timestamp), groupedKeyValues) =>
-
-        val durability = groupedKeyValues.head.durability
-        val qualifiers = new ArrayBuffer[Array[Byte]]()
-        val values = new ArrayBuffer[Array[Byte]]()
-
-        groupedKeyValues.foreach { kv =>
-          if (kv.qualifier != null) qualifiers += kv.qualifier
-          if (kv.value != null) values += kv.value
-        }
-        val defer = operation match {
-          case SKeyValue.Put =>
-            val put = new PutRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, values.toArray, timestamp)
-            put.setDurable(durability)
-            _client.put(put)
-          case SKeyValue.Delete =>
-            val delete =
-              if (qualifiers.isEmpty)
-                new DeleteRequest(table.toArray, row.toArray, cf.toArray, timestamp)
-              else
-                new DeleteRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, timestamp)
-            delete.setDurable(durability)
-            _client.delete(delete)
-        }
-        if (withWait) {
-          defer.toFuture(new AnyRef()).map(_ => MutateResponse.Success).recover { case ex: Exception =>
-            groupedKeyValues.foreach { kv => logger.error(s"mutation failed. $kv", ex) }
-            MutateResponse.Failure
-          }
-        } else Future.successful(MutateResponse.Success)
-      }
-      for {
-        incrementRets <- Future.sequence(incrementsFutures)
-        otherRets <- Future.sequence(othersFutures)
-      } yield new MutateResponse(isSuccess = (incrementRets ++ otherRets).forall(_.isSuccess))
-    }
-  }
-
-  /**
-    * write requestKeyValue into storage if the current value in storage that is stored matches.
-    * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge.
-    *
-    * Most important thing is this have to be 'atomic' operation.
-    * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be
-    * either blocked or failed on write-write conflict case.
-    *
-    * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to
-    * prevent wrong data for read.
-    *
-    * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction,
-    * compareAndSet to synchronize.
-    *
-    * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'.
-    * for storage that does not support concurrency control, then storage implementation
-    * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues)
-    * and write(writeLock).
-    *
-    * @param rpc
-    * @param expectedOpt
-    * @return
-    */
-  override def writeLock(rpc: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse] = {
-    val put = new PutRequest(rpc.table, rpc.row, rpc.cf, rpc.qualifier, rpc.value, rpc.timestamp)
-    val expected = expectedOpt.map(_.value).getOrElse(Array.empty)
-    client(withWait = true).compareAndSet(put, expected).map(true.booleanValue())(ret => ret.booleanValue()).toFuture(true)
-      .map(r => new MutateResponse(isSuccess = r))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexBulkFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexBulkFetcher.scala
new file mode 100644
index 0000000..e6bf4e6
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexBulkFetcher.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.hbase
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.schema.ServiceColumn
+import org.apache.s2graph.core.storage.serde.Serializable
+import org.apache.s2graph.core.storage.{StorageIO, StorageSerDe}
+import org.apache.s2graph.core.types.HBaseType
+import org.apache.s2graph.core.utils.Extensions
+import org.apache.s2graph.core.{S2Graph, S2GraphLike, VertexBulkFetcher}
+import org.hbase.async.HBaseClient
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class AsynchbaseVertexBulkFetcher(val graph: S2GraphLike,
+                                  val config: Config,
+                                  val client: HBaseClient,
+                                  val serDe: StorageSerDe,
+                                  val io: StorageIO) extends VertexBulkFetcher {
+
+  import AsynchbaseStorage._
+  import Extensions.DeferOps
+
+  import scala.collection.JavaConverters._
+
+  override def fetchVerticesAll()(implicit ec: ExecutionContext) = {
+    val futures = ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case (hTableName, columns) =>
+      val distinctColumns = columns.toSet
+      val scan = AsynchbasePatcher.newScanner(client, hTableName)
+      scan.setFamily(Serializable.vertexCf)
+      scan.setMaxVersions(1)
+
+      scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map {
+        case null => Seq.empty
+        case kvsLs =>
+          kvsLs.asScala.flatMap { kvs =>
+            serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(kvs.asScala, None)
+              .filter(v => distinctColumns(v.serviceColumn))
+          }
+      }
+    }
+    Future.sequence(futures).map(_.flatten)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala
new file mode 100644
index 0000000..560dd2b
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.hbase
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe}
+import org.hbase.async.HBaseClient
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class AsynchbaseVertexFetcher(val graph: S2GraphLike,
+                              val config: Config,
+                              val client: HBaseClient,
+                              val serDe: StorageSerDe,
+                              val io: StorageIO) extends VertexFetcher  {
+  import AsynchbaseStorage._
+
+  private def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = {
+    val rpc = buildRequest(serDe, queryRequest, vertex)
+    AsynchbaseStorage.fetchKeyValues(client, rpc)
+  }
+
+  override def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = {
+    def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2VertexLike] = {
+      if (kvs.isEmpty) Nil
+      else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq
+    }
+
+    val futures = vertices.map { vertex =>
+      val queryParam = QueryParam.Empty
+      val q = Query.toQuery(Seq(vertex), Seq(queryParam))
+      val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
+
+      fetchKeyValues(queryRequest, vertex).map { kvs =>
+        fromResult(kvs, vertex.serviceColumn.schemaVersion)
+      } recoverWith {
+        case ex: Throwable => Future.successful(Nil)
+      }
+    }
+
+    Future.sequence(futures).map(_.flatten)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeBulkFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeBulkFetcher.scala
new file mode 100644
index 0000000..2ca4b35
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeBulkFetcher.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.rocks
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.schema.Label
+import org.apache.s2graph.core.{EdgeBulkFetcher, S2EdgeLike, S2GraphLike}
+import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe}
+import org.apache.s2graph.core.types.HBaseType
+import org.rocksdb.RocksDB
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+class RocksEdgeBulkFetcher(val graph: S2GraphLike,
+                           val config: Config,
+                           val db: RocksDB,
+                           val vdb: RocksDB,
+                           val serDe: StorageSerDe,
+                           val io: StorageIO) extends EdgeBulkFetcher  {
+  import RocksStorage._
+
+  override def fetchEdgesAll()(implicit ec: ExecutionContext) = {
+    val edges = new ArrayBuffer[S2EdgeLike]()
+    Label.findAll().groupBy(_.hbaseTableName).toSeq.foreach { case (hTableName, labels) =>
+      val distinctLabels = labels.toSet
+
+      val iter = db.newIterator()
+      try {
+        iter.seekToFirst()
+        while (iter.isValid) {
+          val kv = SKeyValue(table, iter.key(), SKeyValue.EdgeCf, qualifier, iter.value, System.currentTimeMillis())
+
+          serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(Seq(kv), None)
+            .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree)
+            .foreach { edge =>
+              edges += edge
+            }
+
+
+          iter.next()
+        }
+
+      } finally {
+        iter.close()
+      }
+    }
+
+    Future.successful(edges)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeFetcher.scala
new file mode 100644
index 0000000..628c5e1
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeFetcher.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.rocks
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.storage.{StorageIO, StorageSerDe}
+import org.apache.s2graph.core.types.VertexId
+import org.rocksdb.RocksDB
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class RocksEdgeFetcher(val graph: S2GraphLike,
+                       val config: Config,
+                       val db: RocksDB,
+                       val vdb: RocksDB,
+                       val serDe: StorageSerDe,
+                       val io: StorageIO) extends EdgeFetcher  {
+  import RocksStorage._
+
+  override def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = {
+    val futures = for {
+      queryRequest <- queryRequests
+    } yield {
+      val parentEdges = prevStepEdges.getOrElse(queryRequest.vertex.id, Nil)
+      val edge = graph.elementBuilder.toRequestEdge(queryRequest, parentEdges)
+      val rpc = buildRequest(graph, serDe, queryRequest, edge)
+      fetchKeyValues(vdb, db, rpc).map { kvs =>
+        val queryParam = queryRequest.queryParam
+        val stepResult = io.toEdges(kvs, queryRequest, queryRequest.prevStepScore, false, parentEdges)
+        val edgeWithScores = stepResult.edgeWithScores.filter { case edgeWithScore =>
+          val edge = edgeWithScore.edge
+          val duration = queryParam.durationOpt.getOrElse((Long.MinValue, Long.MaxValue))
+          edge.ts >= duration._1 && edge.ts < duration._2
+        }
+
+        stepResult.copy(edgeWithScores = edgeWithScores)
+      }
+    }
+
+    Future.sequence(futures)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksOptimisticEdgeFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksOptimisticEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksOptimisticEdgeFetcher.scala
new file mode 100644
index 0000000..6513442
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksOptimisticEdgeFetcher.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.rocks
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.{QueryRequest, S2EdgeLike, S2GraphLike}
+import org.apache.s2graph.core.storage.{OptimisticEdgeFetcher, SKeyValue, StorageIO, StorageSerDe}
+import org.rocksdb.RocksDB
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class RocksOptimisticEdgeFetcher(val graph: S2GraphLike,
+                                 val config: Config,
+                                 val db: RocksDB,
+                                 val vdb: RocksDB,
+                                 val serDe: StorageSerDe,
+                                 val io: StorageIO) extends OptimisticEdgeFetcher {
+
+  override protected def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = {
+    val request = RocksStorage.buildRequest(graph, serDe, queryRequest, edge)
+
+    RocksStorage.fetchKeyValues(vdb, db, request)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksOptimisticMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksOptimisticMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksOptimisticMutator.scala
new file mode 100644
index 0000000..ec77d3f
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksOptimisticMutator.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.rocks
+
+import java.util.concurrent.locks.ReentrantLock
+
+import com.google.common.cache.{Cache, LoadingCache}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.S2GraphLike
+import org.apache.s2graph.core.storage._
+import org.apache.s2graph.core.utils.logger
+import org.rocksdb.{RocksDB, RocksDBException, WriteBatch, WriteOptions}
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class RocksOptimisticMutator(val graph: S2GraphLike,
+                             val serDe: StorageSerDe,
+                             val optimisticEdgeFetcher: OptimisticEdgeFetcher,
+                             val db: RocksDB,
+                             val vdb: RocksDB,
+                             val lockMap: LoadingCache[String, ReentrantLock]) extends OptimisticMutator {
+
+  override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext) = {
+    if (kvs.isEmpty) {
+      Future.successful(MutateResponse.Success)
+    } else {
+      val ret = {
+        val (kvsV, kvsE) = kvs.partition(kv => Bytes.equals(kv.cf, SKeyValue.VertexCf))
+        val writeBatchV = buildWriteBatch(kvsV)
+        val writeBatchE = buildWriteBatch(kvsE)
+        val writeOptions = new WriteOptions
+        try {
+          vdb.write(writeOptions, writeBatchV)
+          db.write(writeOptions, writeBatchE)
+          true
+        } catch {
+          case e: Exception =>
+            logger.error(s"writeAsyncSimple failed.", e)
+            false
+        } finally {
+          writeBatchV.close()
+          writeBatchE.close()
+          writeOptions.close()
+        }
+      }
+
+      Future.successful(new MutateResponse(ret))
+    }
+  }
+
+
+  override def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext) = {
+    def op = {
+      val writeOptions = new WriteOptions
+      try {
+        val fetchedValue = db.get(requestKeyValue.row)
+        val innerRet = expectedOpt match {
+          case None =>
+            if (fetchedValue == null) {
+
+              db.put(writeOptions, requestKeyValue.row, requestKeyValue.value)
+              true
+            } else {
+              false
+            }
+          case Some(kv) =>
+            if (fetchedValue == null) {
+              false
+            } else {
+              if (Bytes.compareTo(fetchedValue, kv.value) == 0) {
+                db.put(writeOptions, requestKeyValue.row, requestKeyValue.value)
+                true
+              } else {
+                false
+              }
+            }
+        }
+
+        Future.successful(new MutateResponse(innerRet))
+      } catch {
+        case e: RocksDBException =>
+          logger.error(s"Write lock failed", e)
+          Future.successful(MutateResponse.Failure)
+      } finally {
+        writeOptions.close()
+      }
+    }
+
+    withLock(requestKeyValue.row)(op)
+  }
+
+  private def buildWriteBatch(kvs: Seq[SKeyValue]): WriteBatch = {
+    val writeBatch = new WriteBatch()
+    kvs.foreach { kv =>
+      kv.operation match {
+        case SKeyValue.Put => writeBatch.put(kv.row, kv.value)
+        case SKeyValue.Delete => writeBatch.remove(kv.row)
+        case SKeyValue.Increment => writeBatch.merge(kv.row, kv.value)
+        case _ => throw new RuntimeException(s"not supported rpc operation. ${kv.operation}")
+      }
+    }
+    writeBatch
+  }
+
+  private def withLock[A](key: Array[Byte])(op: => A): A = {
+    val lockKey = Bytes.toString(key)
+    val lock = lockMap.get(lockKey)
+
+    try {
+      lock.lock
+      op
+    } finally {
+      lock.unlock()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
index b24e375..8948e13 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
@@ -19,22 +19,30 @@
 
 package org.apache.s2graph.core.storage.rocks
 
+import java.util.Base64
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
 
 import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
 import com.google.common.hash.Hashing
 import com.typesafe.config.Config
+import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.storage.{Storage, StorageManagement, StorageReadable, StorageSerDe}
-import org.apache.s2graph.core.storage.rocks.RocksHelper.RocksRPC
+import org.apache.s2graph.core.storage.rocks.RocksHelper.{GetRequest, RocksRPC, ScanWithRange}
+import org.apache.s2graph.core.storage.serde.StorageSerializable
+import org.apache.s2graph.core.storage._
+import org.apache.s2graph.core.types.VertexId
 import org.apache.s2graph.core.utils.logger
 import org.rocksdb._
 import org.rocksdb.util.SizeUnit
 
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
 import scala.util.{Random, Try}
 
 object RocksStorage {
+  val table = Array.emptyByteArray
+  val qualifier = Array.emptyByteArray
 
   RocksDB.loadLibrary()
 
@@ -129,6 +137,84 @@ object RocksStorage {
         throw e
     }
   }
+
+  def buildRequest(graph: S2GraphLike, serDe: StorageSerDe, queryRequest: QueryRequest, edge: S2EdgeLike): RocksRPC = {
+    queryRequest.queryParam.tgtVertexInnerIdOpt match {
+      case None => // indexEdges
+        val queryParam = queryRequest.queryParam
+        val indexEdgeOpt = edge.edgesWithIndex.filter(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq).headOption
+        val indexEdge = indexEdgeOpt.getOrElse(throw new RuntimeException(s"Can`t find index for query $queryParam"))
+        val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
+        val labelWithDirBytes = indexEdge.labelWithDir.bytes
+        val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
+
+        val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
+
+        val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge))
+        val (startKey, stopKey) =
+          if (queryParam.intervalOpt.isDefined) {
+            val _startKey = queryParam.cursorOpt match {
+              case Some(cursor) => Base64.getDecoder.decode(cursor)
+              case None => Bytes.add(baseKey, intervalMaxBytes)
+            }
+            (_startKey, Bytes.add(baseKey, intervalMinBytes))
+          } else {
+            val _startKey = queryParam.cursorOpt match {
+              case Some(cursor) => Base64.getDecoder.decode(cursor)
+              case None => baseKey
+            }
+            (_startKey, Bytes.add(baseKey, Array.fill(1)(-1)))
+          }
+
+        Right(ScanWithRange(SKeyValue.EdgeCf, startKey, stopKey, queryParam.innerOffset, queryParam.innerLimit))
+
+      case Some(tgtId) => // snapshotEdge
+        val kv = serDe.snapshotEdgeSerializer(graph.elementBuilder.toRequestEdge(queryRequest, Nil).toSnapshotEdge).toKeyValues.head
+        Left(GetRequest(SKeyValue.EdgeCf, kv.row))
+    }
+  }
+
+  def buildRequest(queryRequest: QueryRequest, vertex: S2VertexLike): RocksRPC = {
+    val startKey = vertex.id.bytes
+    val stopKey = Bytes.add(startKey, Array.fill(1)(Byte.MaxValue))
+
+    Right(ScanWithRange(SKeyValue.VertexCf, startKey, stopKey, 0, Byte.MaxValue))
+  }
+
+  def fetchKeyValues(vdb: RocksDB, db: RocksDB, rpc: RocksRPC)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = {
+    rpc match {
+      case Left(GetRequest(cf, key)) =>
+        val _db = if (Bytes.equals(cf, SKeyValue.VertexCf)) vdb else db
+        val v = _db.get(key)
+
+        val kvs =
+          if (v == null) Seq.empty
+          else Seq(SKeyValue(table, key, cf, qualifier, v, System.currentTimeMillis()))
+
+        Future.successful(kvs)
+      case Right(ScanWithRange(cf, startKey, stopKey, offset, limit)) =>
+        val _db = if (Bytes.equals(cf, SKeyValue.VertexCf)) vdb else db
+        val kvs = new ArrayBuffer[SKeyValue]()
+        val iter = _db.newIterator()
+
+        try {
+          var idx = 0
+          iter.seek(startKey)
+          val (startOffset, len) = (offset, limit)
+          while (iter.isValid && Bytes.compareTo(iter.key, stopKey) <= 0 && idx < startOffset + len) {
+            if (idx >= startOffset) {
+              kvs += SKeyValue(table, iter.key, cf, qualifier, iter.value, System.currentTimeMillis())
+            }
+
+            iter.next()
+            idx += 1
+          }
+        } finally {
+          iter.close()
+        }
+        Future.successful(kvs)
+    }
+  }
 }
 
 class RocksStorage(override val graph: S2GraphLike,
@@ -150,12 +236,18 @@ class RocksStorage(override val graph: S2GraphLike,
     .maximumSize(1000 * 10 * 10 * 10 * 10)
     .build[String, ReentrantLock](cacheLoader)
 
-  override val management: StorageManagement = new RocksStorageManagement(config, vdb, db)
+  private lazy val optimisticEdgeFetcher = new RocksOptimisticEdgeFetcher(graph, config, db, vdb, serDe, io)
+  private lazy val optimisticMutator = new RocksOptimisticMutator(graph, serDe, optimisticEdgeFetcher, db, vdb, lockMap)
+  private lazy val _mutator = new DefaultOptimisticMutator(graph, serDe, optimisticEdgeFetcher, optimisticMutator)
 
+  override val management: StorageManagement = new RocksStorageManagement(config, vdb, db)
   override val serDe: StorageSerDe = new RocksStorageSerDe(graph)
 
-  override val reader: StorageReadable = new RocksStorageReadable(graph, config, db, vdb, serDe, io)
-
-  override val mutator: Mutator = new RocksStorageWritable(graph, serDe, reader, db, vdb, lockMap)
+  override val edgeFetcher: EdgeFetcher = new RocksEdgeFetcher(graph, config, db, vdb, serDe, io)
+  override val edgeBulkFetcher: EdgeBulkFetcher = new RocksEdgeBulkFetcher(graph, config, db, vdb, serDe, io)
+  override val vertexFetcher: VertexFetcher = new RocksVertexFetcher(graph, config, db, vdb, serDe, io)
+  override val vertexBulkFetcher: VertexBulkFetcher = new RocksVertexBulkFetcher(graph, config, db, vdb, serDe, io)
 
+  override val edgeMutator: EdgeMutator = _mutator
+  override val vertexMutator: VertexMutator = _mutator
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
deleted file mode 100644
index 27e3efd..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage.rocks
-
-import java.util.Base64
-
-import com.typesafe.config.Config
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core._
-import org.apache.s2graph.core.schema.{Label, ServiceColumn}
-import org.apache.s2graph.core.storage.rocks.RocksHelper.{GetRequest, RocksRPC, ScanWithRange}
-import org.apache.s2graph.core.storage.serde.StorageSerializable
-import org.apache.s2graph.core.storage._
-import org.apache.s2graph.core.types.{HBaseType, VertexId}
-import org.rocksdb.RocksDB
-
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{ExecutionContext, Future}
-
-class RocksStorageReadable(val graph: S2GraphLike,
-                           val config: Config,
-                           val db: RocksDB,
-                           val vdb: RocksDB,
-                           val serDe: StorageSerDe,
-                           override val io: StorageIO) extends StorageReadable {
-
-  private val table = Array.emptyByteArray
-  private val qualifier = Array.emptyByteArray
-
-  private def buildRequest(queryRequest: QueryRequest, edge: S2EdgeLike): RocksRPC = {
-    queryRequest.queryParam.tgtVertexInnerIdOpt match {
-      case None => // indexEdges
-        val queryParam = queryRequest.queryParam
-        val indexEdgeOpt = edge.edgesWithIndex.filter(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq).headOption
-        val indexEdge = indexEdgeOpt.getOrElse(throw new RuntimeException(s"Can`t find index for query $queryParam"))
-        val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
-        val labelWithDirBytes = indexEdge.labelWithDir.bytes
-        val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
-
-        val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
-
-        val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge))
-        val (startKey, stopKey) =
-          if (queryParam.intervalOpt.isDefined) {
-            val _startKey = queryParam.cursorOpt match {
-              case Some(cursor) => Base64.getDecoder.decode(cursor)
-              case None => Bytes.add(baseKey, intervalMaxBytes)
-            }
-            (_startKey, Bytes.add(baseKey, intervalMinBytes))
-          } else {
-            val _startKey = queryParam.cursorOpt match {
-              case Some(cursor) => Base64.getDecoder.decode(cursor)
-              case None => baseKey
-            }
-            (_startKey, Bytes.add(baseKey, Array.fill(1)(-1)))
-          }
-
-        Right(ScanWithRange(SKeyValue.EdgeCf, startKey, stopKey, queryParam.innerOffset, queryParam.innerLimit))
-
-      case Some(tgtId) => // snapshotEdge
-        val kv = serDe.snapshotEdgeSerializer(graph.elementBuilder.toRequestEdge(queryRequest, Nil).toSnapshotEdge).toKeyValues.head
-        Left(GetRequest(SKeyValue.EdgeCf, kv.row))
-    }
-  }
-
-  private def buildRequest(queryRequest: QueryRequest, vertex: S2VertexLike): RocksRPC = {
-    val startKey = vertex.id.bytes
-    val stopKey = Bytes.add(startKey, Array.fill(1)(Byte.MaxValue))
-
-    Right(ScanWithRange(SKeyValue.VertexCf, startKey, stopKey, 0, Byte.MaxValue))
-//    val kv = serDe.vertexSerializer(vertex).toKeyValues.head
-//    Left(GetRequest(SKeyValue.VertexCf, kv.row))
-  }
-
-  override def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = {
-    val futures = for {
-      queryRequest <- queryRequests
-    } yield {
-      val parentEdges = prevStepEdges.getOrElse(queryRequest.vertex.id, Nil)
-      val edge = graph.elementBuilder.toRequestEdge(queryRequest, parentEdges)
-      val rpc = buildRequest(queryRequest, edge)
-      fetchKeyValues(rpc).map { kvs =>
-        val queryParam = queryRequest.queryParam
-        val stepResult = io.toEdges(kvs, queryRequest, queryRequest.prevStepScore, false, parentEdges)
-        val edgeWithScores = stepResult.edgeWithScores.filter { case edgeWithScore =>
-          val edge = edgeWithScore.edge
-          val duration = queryParam.durationOpt.getOrElse((Long.MinValue, Long.MaxValue))
-          edge.ts >= duration._1 && edge.ts < duration._2
-        }
-
-        stepResult.copy(edgeWithScores = edgeWithScores)
-      }
-    }
-
-    Future.sequence(futures)
-  }
-
-  private def fetchKeyValues(rpc: RocksRPC)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = {
-    rpc match {
-      case Left(GetRequest(cf, key)) =>
-        val _db = if (Bytes.equals(cf, SKeyValue.VertexCf)) vdb else db
-        val v = _db.get(key)
-
-        val kvs =
-          if (v == null) Seq.empty
-          else Seq(SKeyValue(table, key, cf, qualifier, v, System.currentTimeMillis()))
-
-        Future.successful(kvs)
-      case Right(ScanWithRange(cf, startKey, stopKey, offset, limit)) =>
-        val _db = if (Bytes.equals(cf, SKeyValue.VertexCf)) vdb else db
-        val kvs = new ArrayBuffer[SKeyValue]()
-        val iter = _db.newIterator()
-
-        try {
-          var idx = 0
-          iter.seek(startKey)
-          val (startOffset, len) = (offset, limit)
-          while (iter.isValid && Bytes.compareTo(iter.key, stopKey) <= 0 && idx < startOffset + len) {
-            if (idx >= startOffset) {
-              kvs += SKeyValue(table, iter.key, cf, qualifier, iter.value, System.currentTimeMillis())
-            }
-
-            iter.next()
-            idx += 1
-          }
-        } finally {
-          iter.close()
-        }
-        Future.successful(kvs)
-    }
-  }
-
-  override def fetchEdgesAll()(implicit ec: ExecutionContext) = {
-    val edges = new ArrayBuffer[S2EdgeLike]()
-    Label.findAll().groupBy(_.hbaseTableName).toSeq.foreach { case (hTableName, labels) =>
-      val distinctLabels = labels.toSet
-
-      val iter = db.newIterator()
-      try {
-        iter.seekToFirst()
-        while (iter.isValid) {
-          val kv = SKeyValue(table, iter.key(), SKeyValue.EdgeCf, qualifier, iter.value, System.currentTimeMillis())
-
-          serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(Seq(kv), None)
-            .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree)
-            .foreach { edge =>
-              edges += edge
-            }
-
-
-          iter.next()
-        }
-
-      } finally {
-        iter.close()
-      }
-    }
-
-    Future.successful(edges)
-  }
-
-  override def fetchVerticesAll()(implicit ec: ExecutionContext) = {
-    import scala.collection.mutable
-
-    val vertices = new ArrayBuffer[S2VertexLike]()
-    ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.foreach { case (hTableName, columns) =>
-      val distinctColumns = columns.toSet
-
-      val iter = vdb.newIterator()
-      val buffer = mutable.ListBuffer.empty[SKeyValue]
-      var oldVertexIdBytes = Array.empty[Byte]
-      var minusPos = 0
-
-      try {
-        iter.seekToFirst()
-        while (iter.isValid) {
-          val row = iter.key()
-          if (!Bytes.equals(oldVertexIdBytes, 0, oldVertexIdBytes.length - minusPos, row, 0, row.length - 1)) {
-            if (buffer.nonEmpty)
-              serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(buffer, None)
-              .filter(v => distinctColumns(v.serviceColumn))
-              .foreach { vertex =>
-                vertices += vertex
-              }
-
-            oldVertexIdBytes = row
-            minusPos = 1
-            buffer.clear()
-          }
-          val kv = SKeyValue(table, iter.key(), SKeyValue.VertexCf, qualifier, iter.value(), System.currentTimeMillis())
-          buffer += kv
-
-          iter.next()
-        }
-        if (buffer.nonEmpty)
-          serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(buffer, None)
-          .filter(v => distinctColumns(v.serviceColumn))
-          .foreach { vertex =>
-            vertices += vertex
-          }
-
-      } finally {
-        iter.close()
-      }
-    }
-
-    Future.successful(vertices)
-  }
-
-  override def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext) = {
-    fetchKeyValues(buildRequest(queryRequest, edge))
-  }
-
-  override def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext) = {
-    fetchKeyValues(buildRequest(queryRequest, vertex))
-  }
-}