You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2017/10/30 10:23:34 UTC

[6/8] incubator-s2graph git commit: remove type parameter on Storage Trait (#13)

remove type parameter on Storage Trait (#13)



Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/55d194ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/55d194ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/55d194ed

Branch: refs/heads/master
Commit: 55d194ed8edfb0ec78ce5723c379c20fa7462632
Parents: f416194
Author: daewon <da...@apache.org>
Authored: Mon Oct 30 17:07:48 2017 +0900
Committer: Doyung Yoon <st...@apache.org>
Committed: Mon Oct 30 10:07:48 2017 +0200

----------------------------------------------------------------------
 .../scala/org/apache/s2graph/core/S2Graph.scala | 54 ++++++--------------
 .../apache/s2graph/core/storage/SKeyValue.scala | 13 +++--
 .../apache/s2graph/core/storage/Storage.scala   | 16 ++----
 .../s2graph/core/storage/StorageReadable.scala  | 30 +++++------
 .../storage/WriteWriteConflictResolver.scala    |  6 +--
 .../core/storage/hbase/AsynchbaseStorage.scala  |  5 +-
 .../hbase/AsynchbaseStorageReadable.scala       | 46 ++++++++++++++---
 7 files changed, 84 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/55d194ed/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 92f68dc..34db9e4 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -133,7 +133,7 @@ object S2Graph {
     new S2Graph(configuration)(ec)
   }
 
-  def initStorage(graph: S2Graph, config: Config)(ec: ExecutionContext): Storage[_] = {
+  def initStorage(graph: S2Graph, config: Config)(ec: ExecutionContext): Storage = {
     val storageBackend = config.getString("s2graph.storage.backend")
     logger.info(s"[InitStorage]: $storageBackend")
 
@@ -908,7 +908,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
   /**
     * TODO: we need to some way to handle malformed configuration for storage.
     */
-  val storagePool: scala.collection.mutable.Map[String, Storage[_]] = {
+  val storagePool: scala.collection.mutable.Map[String, Storage] = {
     val labels = Label.findAll()
     val services = Service.findAll()
 
@@ -919,12 +919,12 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
       confWithFallback(conf)
     }.toSet
 
-    val pools = new java.util.HashMap[Config, Storage[_]]()
+    val pools = new java.util.HashMap[Config, Storage]()
     configs.foreach { config =>
       pools.put(config, S2Graph.initStorage(this, config)(ec))
     }
 
-    val m = new java.util.concurrent.ConcurrentHashMap[String, Storage[_]]()
+    val m = new java.util.concurrent.ConcurrentHashMap[String, Storage]()
 
     labels.foreach { label =>
       if (label.storageConfigOpt.isDefined) {
@@ -941,7 +941,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
     m
   }
 
-  val defaultStorage: Storage[_] = S2Graph.initStorage(this, config)(ec)
+  val defaultStorage: Storage = S2Graph.initStorage(this, config)(ec)
 
   /** QueryLevel FutureCache */
   val queryFutureCache = new DeferCache[StepResult, Promise, Future](parseCacheConfig(config, "query."), empty = StepResult.Empty)
@@ -953,11 +953,11 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
 
   val indexProvider = IndexProvider.apply(config)
 
-  def getStorage(service: Service): Storage[_] = {
+  def getStorage(service: Service): Storage = {
     storagePool.getOrElse(s"service:${service.serviceName}", defaultStorage)
   }
 
-  def getStorage(label: Label): Storage[_] = {
+  def getStorage(label: Label): Storage = {
     storagePool.getOrElse(s"label:${label.label}", defaultStorage)
   }
 
@@ -975,7 +975,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
     val futures = for {
       edge <- edges
     } yield {
-      getStorage(edge.innerLabel).fetchSnapshotEdgeInner(edge).map { case (_, edgeOpt, _) =>
+      getStorage(edge.innerLabel).fetchSnapshotEdgeInner(edge).map { case (edgeOpt, _) =>
         edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0, edge.innerLabel))
       }
     }
@@ -1145,31 +1145,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
   }
 
   def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = {
-    def getVertices[Q](storage: Storage[Q])(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = {
-      def fromResult(kvs: Seq[SKeyValue],
-                     version: String): Option[S2Vertex] = {
-        if (kvs.isEmpty) None
-        else storage.vertexDeserializer(version).fromKeyValues(kvs, None)
-        //        .map(S2Vertex(graph, _))
-      }
-
-      val futures = vertices.map { vertex =>
-        val queryParam = QueryParam.Empty
-        val q = Query.toQuery(Seq(vertex), Seq(queryParam))
-        val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
-        val rpc = storage.buildRequest(queryRequest, vertex)
-        storage.fetchKeyValues(rpc).map { kvs =>
-          fromResult(kvs, vertex.serviceColumn.schemaVersion)
-        } recoverWith { case ex: Throwable =>
-          Future.successful(None)
-        }
-      }
-
-      Future.sequence(futures).map { result => result.toList.flatten }
-    }
     val verticesWithIdx = vertices.zipWithIndex
     val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
-      getVertices(getStorage(service))(vertexGroup.map(_._1)).map(_.zip(vertexGroup.map(_._2)))
+      getStorage(service).fetchVertices(vertices).map(_.zip(vertexGroup.map(_._2)))
     }
 
     Future.sequence(futures).map { ls =>
@@ -1272,7 +1250,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
     }
   }
 
-  private def deleteAllFetchedEdgesAsyncOld(storage: Storage[_])(stepInnerResult: StepResult,
+  private def deleteAllFetchedEdgesAsyncOld(storage: Storage)(stepInnerResult: StepResult,
                                                          requestTs: Long,
                                                          retryNum: Int): Future[Boolean] = {
     if (stepInnerResult.isEmpty) Future.successful(true)
@@ -1426,7 +1404,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
     }
   }
 
-  private def mutateStrongEdges(storage: Storage[_])(_edges: Seq[S2Edge], withWait: Boolean): Future[Seq[Boolean]] = {
+  private def mutateStrongEdges(storage: Storage)(_edges: Seq[S2Edge], withWait: Boolean): Future[Seq[Boolean]] = {
 
     val edgeWithIdxs = _edges.zipWithIndex
     val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
@@ -1462,7 +1440,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
   }
 
 
-  private def mutateEdgesInner(storage: Storage[_])(edges: Seq[S2Edge],
+  private def mutateEdgesInner(storage: Storage)(edges: Seq[S2Edge],
                        checkConsistency: Boolean,
                        withWait: Boolean): Future[MutateResponse] = {
     assert(edges.nonEmpty)
@@ -1483,14 +1461,14 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
       }
       Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) }
     } else {
-      storage.fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
+      storage.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
         storage.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_))
       }
     }
   }
 
   def mutateVertices(vertices: Seq[S2Vertex], withWait: Boolean = false): Future[Seq[MutateResponse]] = {
-    def mutateVertex(storage: Storage[_])(vertex: S2Vertex, withWait: Boolean): Future[MutateResponse] = {
+    def mutateVertex(storage: Storage)(vertex: S2Vertex, withWait: Boolean): Future[MutateResponse] = {
       if (vertex.op == GraphUtil.operations("delete")) {
         storage.writeToStorage(vertex.hbaseZkAddr,
           storage.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait)
@@ -1502,7 +1480,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
       }
     }
 
-    def mutateVertices(storage: Storage[_])(vertices: Seq[S2Vertex],
+    def mutateVertices(storage: Storage)(vertices: Seq[S2Vertex],
                        withWait: Boolean = false): Future[Seq[MutateResponse]] = {
       val futures = vertices.map { vertex => mutateVertex(storage)(vertex, withWait) }
       Future.sequence(futures)
@@ -1518,7 +1496,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
 
 
   def incrementCounts(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[MutateResponse]] = {
-    def incrementCounts(storage: Storage[_])(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[MutateResponse]] = {
+    def incrementCounts(storage: Storage)(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[MutateResponse]] = {
       val futures = for {
         edge <- edges
       } yield {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/55d194ed/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
index 924d9a3..775afda 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
@@ -56,18 +56,17 @@ trait CanSKeyValue[T] {
 }
 
 object CanSKeyValue {
+  def instance[T](f: T => SKeyValue): CanSKeyValue[T] = new CanSKeyValue[T] {
+    override def toSKeyValue(from: T): SKeyValue = f.apply(from)
+  }
 
   // For asyncbase KeyValues
-  implicit val asyncKeyValue = new CanSKeyValue[KeyValue] {
-    def toSKeyValue(kv: KeyValue): SKeyValue = {
-      SKeyValue(Array.empty[Byte], kv.key(), kv.family(), kv.qualifier(), kv.value(), kv.timestamp())
-    }
+  implicit val asyncKeyValue = instance[KeyValue] { kv =>
+    SKeyValue(Array.empty[Byte], kv.key(), kv.family(), kv.qualifier(), kv.value(), kv.timestamp())
   }
 
   // For asyncbase KeyValues
-  implicit val sKeyValue = new CanSKeyValue[SKeyValue] {
-    def toSKeyValue(kv: SKeyValue): SKeyValue = kv
-  }
+  implicit val sKeyValue = instance[SKeyValue](identity)
 
   // For hbase KeyValues
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/55d194ed/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index 2fe6e42..e4eafbf 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -27,7 +27,7 @@ import org.apache.s2graph.core.storage.serde.indexedge.tall.IndexEdgeDeserializa
 import org.apache.s2graph.core.types._
 import scala.concurrent.{ExecutionContext, Future}
 
-abstract class Storage[Q](val graph: S2Graph,
+abstract class Storage(val graph: S2Graph,
                           val config: Config) {
   /* Storage backend specific resource management */
   val management: StorageManagement
@@ -39,7 +39,7 @@ abstract class Storage[Q](val graph: S2Graph,
    * Given QueryRequest/Vertex/Edge, fetch KeyValue from storage
    * then convert them into Edge/Vertex
    */
-  val fetcher: StorageReadable[Q]
+  val fetcher: StorageReadable
 
   /*
    * Serialize Edge/Vertex, to common KeyValue, SKeyValue that
@@ -61,7 +61,6 @@ abstract class Storage[Q](val graph: S2Graph,
    */
   lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, mutator, fetcher)
 
-
   /** IO **/
   def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge): serde.Serializable[SnapshotEdge] =
     serDe.snapshotEdgeSerializer(snapshotEdge)
@@ -115,22 +114,18 @@ abstract class Storage[Q](val graph: S2Graph,
     mutator.writeLock(requestKeyValue, expectedOpt)
 
   /** Fetch **/
-
-  def buildRequest(queryRequest: QueryRequest, edge: S2Edge): Q = fetcher.buildRequest(queryRequest, edge)
-
-  def buildRequest(queryRequest: QueryRequest, vertex: S2Vertex): Q = fetcher.buildRequest(queryRequest, vertex)
-
   def fetches(queryRequests: Seq[QueryRequest],
               prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] =
     fetcher.fetches(queryRequests, prevStepEdges)
 
-  def fetchKeyValues(rpc: Q)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = fetcher.fetchKeyValues(rpc)
+  def fetchVertices(vertices: Seq[S2Vertex])(implicit ec: ExecutionContext): Future[Seq[S2Vertex]] =
+    fetcher.fetchVertices(vertices)
 
   def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2Edge]] = fetcher.fetchEdgesAll()
 
   def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2Vertex]] = fetcher.fetchVerticesAll()
 
-  def fetchSnapshotEdgeInner(edge: S2Edge)(implicit ec: ExecutionContext): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] =
+  def fetchSnapshotEdgeInner(edge: S2Edge)(implicit ec: ExecutionContext): Future[(Option[S2Edge], Option[SKeyValue])] =
     fetcher.fetchSnapshotEdgeInner(edge)
 
   /** Conflict Resolver **/
@@ -149,6 +144,5 @@ abstract class Storage[Q](val graph: S2Graph,
 
   def shutdown(): Unit = management.shutdown()
 
-
   def info: Map[String, String] = Map("className" -> this.getClass.getSimpleName)
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/55d194ed/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
index 7a0d8ef..03b01fd 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
@@ -26,21 +26,9 @@ import org.apache.s2graph.core.utils.logger
 
 import scala.concurrent.{ExecutionContext, Future}
 
-trait StorageReadable[Q] {
+trait StorageReadable {
   val io: StorageIO
-  /**
-    * build proper request which is specific into storage to call fetchIndexEdgeKeyValues or fetchSnapshotEdgeKeyValues.
-    * for example, Asynchbase use GetRequest, Scanner so this method is responsible to build
-    * client request(GetRequest, Scanner) based on user provided query.
-    *
-    * @param queryRequest
-    * @return
-    */
-  def buildRequest(queryRequest: QueryRequest, edge: S2Edge): Q
-
-  def buildRequest(queryRequest: QueryRequest, vertex: S2Vertex): Q
-
-  /**
+ /**
     * responsible to fire parallel fetch call into storage and create future that will return merged result.
     *
     * @param queryRequests
@@ -50,13 +38,18 @@ trait StorageReadable[Q] {
   def fetches(queryRequests: Seq[QueryRequest],
               prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]]
 
-  def fetchKeyValues(rpc: Q)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
+// private def fetchKeyValues(rpc: Q)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
+  def fetchVertices(vertices: Seq[S2Vertex])(implicit ec: ExecutionContext): Future[Seq[S2Vertex]]
 
   def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2Edge]]
 
   def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2Vertex]]
 
-  def fetchSnapshotEdgeInner(edge: S2Edge)(implicit ec: ExecutionContext): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] = {
+  protected def fetchKeyValues(queryRequest: QueryRequest, edge: S2Edge)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
+
+  protected def fetchKeyValues(queryRequest: QueryRequest, vertex: S2Vertex)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
+
+  def fetchSnapshotEdgeInner(edge: S2Edge)(implicit ec: ExecutionContext): Future[(Option[S2Edge], Option[SKeyValue])] = {
     val queryParam = QueryParam(labelName = edge.innerLabel.label,
       direction = GraphUtil.fromDirection(edge.labelWithDir.dir),
       tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal),
@@ -64,15 +57,16 @@ trait StorageReadable[Q] {
     val q = Query.toQuery(Seq(edge.srcVertex), Seq(queryParam))
     val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam)
 
-    fetchKeyValues(buildRequest(queryRequest, edge)).map { kvs =>
+    fetchKeyValues(queryRequest, edge).map { kvs =>
       val (edgeOpt, kvOpt) =
         if (kvs.isEmpty) (None, None)
         else {
+          import CanSKeyValue._
           val snapshotEdgeOpt = io.toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil)
           val _kvOpt = kvs.headOption
           (snapshotEdgeOpt, _kvOpt)
         }
-      (queryParam, edgeOpt, kvOpt)
+      (edgeOpt, kvOpt)
     } recoverWith { case ex: Throwable =>
       logger.error(s"fetchQueryParam failed. fallback return.", ex)
       throw new FetchTimeoutException(s"${edge.toLogString}")

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/55d194ed/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
index 79b764d..227cfa7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
@@ -32,7 +32,7 @@ class WriteWriteConflictResolver(graph: S2Graph,
                                  serDe: StorageSerDe,
                                  io: StorageIO,
                                  mutator: StorageWritable,
-                                 fetcher: StorageReadable[_]) {
+                                 fetcher: StorageReadable) {
 
   val BackoffTimeout = graph.BackoffTimeout
   val MaxRetryNum = graph.MaxRetryNum
@@ -69,7 +69,7 @@ class WriteWriteConflictResolver(graph: S2Graph,
         case FetchTimeoutException(retryEdge) =>
           logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}")
           /* fetch failed. re-fetch should be done */
-          fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
+          fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
             retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
           }
 
@@ -91,7 +91,7 @@ class WriteWriteConflictResolver(graph: S2Graph,
               val future = if (failedStatusCode == 0) {
                 // acquire Lock failed. other is mutating so this thead need to re-fetch snapshotEdge.
                 /* fetch failed. re-fetch should be done */
-                fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
+                fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
                   retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
                 }
               } else {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/55d194ed/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 54007d5..ef1350a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -27,7 +27,6 @@ import com.typesafe.config.Config
 import org.apache.commons.io.FileUtils
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.storage._
-import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.AsyncRPC
 import org.apache.s2graph.core.utils._
 import org.hbase.async._
 import org.apache.s2graph.core.storage.serde._
@@ -140,7 +139,7 @@ object AsynchbaseStorage {
 
 
 class AsynchbaseStorage(override val graph: S2Graph,
-                        override val config: Config) extends Storage[AsyncRPC](graph, config) {
+                        override val config: Config) extends Storage(graph, config) {
 
   /**
     * since some runtime environment such as spark cluster has issue with guava version, that is used in Asynchbase.
@@ -156,7 +155,7 @@ class AsynchbaseStorage(override val graph: S2Graph,
 
   override val serDe: StorageSerDe = new AsynchbaseStorageSerDe(graph)
 
-  override val fetcher: StorageReadable[AsyncRPC] = new AsynchbaseStorageReadable(graph, config, client, serDe, io)
+  override val fetcher: StorageReadable = new AsynchbaseStorageReadable(graph, config, client, serDe, io)
 
   //  val hbaseExecutor: ExecutorService  =
   //    if (config.getString("hbase.zookeeper.quorum") == "localhost")

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/55d194ed/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
index 1cb6109..0dc8491 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
@@ -14,7 +14,7 @@
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
- * under the License.
+ *
  */
 
 package org.apache.s2graph.core.storage.hbase
@@ -42,7 +42,7 @@ class AsynchbaseStorageReadable(val graph: S2Graph,
                                 val config: Config,
                                 val client: HBaseClient,
                                 val serDe: StorageSerDe,
-                                override val io: StorageIO) extends StorageReadable[AsyncRPC] {
+                                override val io: StorageIO) extends StorageReadable {
   import Extensions.DeferOps
   import CanDefer._
 
@@ -67,7 +67,7 @@ class AsynchbaseStorageReadable(val graph: S2Graph,
     * @param queryRequest
     * @return
     */
-  override def buildRequest(queryRequest: QueryRequest, edge: S2Edge) = {
+  private def buildRequest(queryRequest: QueryRequest, edge: S2Edge) = {
     import Serializable._
     val queryParam = queryRequest.queryParam
     val label = queryParam.label
@@ -168,15 +168,26 @@ class AsynchbaseStorageReadable(val graph: S2Graph,
     * @param vertex
     * @return
     */
-  override def buildRequest(queryRequest: QueryRequest, vertex: S2Vertex) = {
+  private def buildRequest(queryRequest: QueryRequest, vertex: S2Vertex) = {
     val kvs = serDe.vertexSerializer(vertex).toKeyValues
     val get = new GetRequest(vertex.hbaseTableName.getBytes, kvs.head.row, Serializable.vertexCf)
     //      get.setTimeout(this.singleGetTimeout.toShort)
     get.setFailfast(true)
     get.maxVersions(1)
+
     Left(get)
   }
 
+  override def fetchKeyValues(queryRequest: QueryRequest, edge: S2Edge)(implicit ec: ExecutionContext) = {
+    val rpc = buildRequest(queryRequest, edge)
+    fetchKeyValues(rpc)
+  }
+
+  override def fetchKeyValues(queryRequest: QueryRequest, vertex: S2Vertex)(implicit ec: ExecutionContext) = {
+    val rpc = buildRequest(queryRequest, vertex)
+    fetchKeyValues(rpc)
+  }
+
   /**
     * responsible to fire parallel fetch call into storage and create future that will return merged result.
     *
@@ -201,7 +212,7 @@ class AsynchbaseStorageReadable(val graph: S2Graph,
     }.toFuture(emptyStepResult)
   }
 
-  override def fetchKeyValues(rpc: AsyncRPC)(implicit ec: ExecutionContext) = {
+  def fetchKeyValues(rpc: AsyncRPC)(implicit ec: ExecutionContext) = {
     val defer = fetchKeyValuesInner(rpc)
     defer.toFuture(emptyKeyValues).map { kvsArr =>
       kvsArr.map { kv =>
@@ -224,7 +235,8 @@ class AsynchbaseStorageReadable(val graph: S2Graph,
             kvs.flatMap { kv =>
               val sKV = implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv)
 
-              serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(Seq(kv), None)
+              serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION)
+                .fromKeyValues(Seq(kv), None)
                 .filter(e => distinctLabels(e.innerLabel) && e.direction == "out" && !e.isDegree)
             }
           }
@@ -234,6 +246,27 @@ class AsynchbaseStorageReadable(val graph: S2Graph,
     Future.sequence(futures).map(_.flatten)
   }
 
+  override def fetchVertices(vertices: Seq[S2Vertex])(implicit ec: ExecutionContext) = {
+    def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2Vertex] = {
+      if (kvs.isEmpty) Nil
+      else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq
+    }
+
+    val futures = vertices.map { vertex =>
+      val queryParam = QueryParam.Empty
+      val q = Query.toQuery(Seq(vertex), Seq(queryParam))
+      val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
+
+      fetchKeyValues(queryRequest, vertex).map { kvs =>
+        fromResult(kvs, vertex.serviceColumn.schemaVersion)
+      } recoverWith {
+        case ex: Throwable => Future.successful(Nil)
+      }
+    }
+
+    Future.sequence(futures).map(_.flatten)
+  }
+
   override def fetchVerticesAll()(implicit ec: ExecutionContext) = {
     val futures = ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case (hTableName, columns) =>
       val distinctColumns = columns.toSet
@@ -351,4 +384,5 @@ class AsynchbaseStorageReadable(val graph: S2Graph,
         throw new RuntimeException(s"toCacheKeyBytes: $hbaseRpc")
     }
   }
+
 }