You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/06/11 15:00:53 UTC

incubator-s2graph git commit: [S2GRAPH-81]: Separate Serializable's toKeyValues into 3, toRowKey, toQualifier, toValue split toKeyValues into toRowKey, toQualifier, toValue, so buildRequest only use toRowKey, toQualifier.

Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 56829adc7 -> 0d1854450


[S2GRAPH-81]: Separate Serializable's toKeyValues into 3, toRowKey, toQualifier, toValue
  split toKeyValues into toRowKey, toQualifier, toValue, so buildRequest only use toRowKey, toQualifier.

JIRA:
  [S2GRAPH-81] https://issues.apache.org/jira/browse/S2GRAPH-81

Pull Request:
  Closes #52

Authors:
  DOYUNG YOON: steamshon@apache.org


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/0d185445
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/0d185445
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/0d185445

Branch: refs/heads/master
Commit: 0d185445056555593b524b02f689aecb9d68906a
Parents: 56829ad
Author: DO YUNG YOON <st...@apache.org>
Authored: Sat Jun 11 23:58:51 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Sat Jun 11 23:58:51 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../core/storage/StorageSerializable.scala      |  18 +-
 .../core/storage/hbase/AsynchbaseStorage.scala  |  19 +-
 .../tall/IndexEdgeDeserializable.scala          | 171 ++++++++-------
 .../indexedge/tall/IndexEdgeSerializable.scala  |  33 ++-
 .../wide/IndexEdgeDeserializable.scala          | 214 ++++++++++---------
 .../indexedge/wide/IndexEdgeSerializable.scala  |  56 +++--
 .../tall/SnapshotEdgeSerializable.scala         |  18 +-
 .../wide/SnapshotEdgeSerializable.scala         |  22 +-
 .../serde/vertex/VertexSerializable.scala       |  12 +-
 10 files changed, 290 insertions(+), 275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 56f14a0..7bcc7ef 100644
--- a/CHANGES
+++ b/CHANGES
@@ -135,6 +135,8 @@ Release 0.12.1 - unreleased
     S2GRAPH-75: Use an embedded database as the default metadata storage.
 		(Contributed by Jong Wook Kim<jo...@nyu.edu>, committed by DOYUNG YOON)
 
+    S2GRAPH-81: Separate Serializable's toKeyValues into 3, toRowKey, toQualifier, toValue. (Committed by DOYUNG YOON).
+
   TEST
     
     S2GRAPH-21: Change PostProcessBenchmarkSpec not to store and fetch test data from storage. (Committed by DOYUNG YOON).

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala
index b6435e4..b7326f5 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala
@@ -56,5 +56,21 @@ object StorageSerializable {
 }
 
 trait StorageSerializable[E] {
-  def toKeyValues: Seq[SKeyValue]
+  val cf = Serializable.edgeCf
+
+  val table: Array[Byte]
+  val ts: Long
+
+  def toRowKey: Array[Byte]
+  def toQualifier: Array[Byte]
+  def toValue: Array[Byte]
+
+  def toKeyValues: Seq[SKeyValue] = {
+    val row = toRowKey
+    val qualifier = toQualifier
+    val value = toValue
+    val kv = SKeyValue(table, row, cf, qualifier, value, ts)
+
+    Seq(kv)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 66a1be4..4bd222f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -181,18 +181,17 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
     val label = queryParam.label
     val edge = toRequestEdge(queryRequest)
 
-    val kv = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
+    val serializer = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
       val snapshotEdge = edge.toSnapshotEdge
-      snapshotEdgeSerializer(snapshotEdge).toKeyValues.head
-      //      new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, kv.qualifier)
+      snapshotEdgeSerializer(snapshotEdge)
     } else {
-      val indexedEdgeOpt = edge.edgesWithIndex.find(e => e.labelIndexSeq == queryParam.labelOrderSeq)
-      assert(indexedEdgeOpt.isDefined)
-
-      val indexedEdge = indexedEdgeOpt.get
-      indexEdgeSerializer(indexedEdge).toKeyValues.head
+      val indexEdge = IndexEdge(edge.srcVertex, edge.tgtVertex, edge.labelWithDir,
+        edge.op, edge.version, queryParam.labelOrderSeq, edge.propsWithTs)
+      indexEdgeSerializer(indexEdge)
     }
 
+    val (rowKey, qualifier) = (serializer.toRowKey, serializer.toQualifier)
+
     val (minTs, maxTs) = queryParam.duration.getOrElse((0L, Long.MaxValue))
 
     label.schemaVersion match {
@@ -246,8 +245,8 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
         scanner
       case _ =>
         val get =
-          if (queryParam.tgtVertexInnerIdOpt.isDefined) new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, kv.qualifier)
-          else new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf)
+          if (queryParam.tgtVertexInnerIdOpt.isDefined) new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf, qualifier)
+          else new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf)
 
         get.maxVersions(1)
         get.setFailfast(true)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
index e80a805..e6265f7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -27,8 +27,6 @@ import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex}
 import scala.collection.immutable
 
-import scala.collection.immutable
-
 class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] {
    import StorageDeserializable._
 
@@ -76,88 +74,87 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
      (Array.empty[(Byte, InnerValLike)], 0)
    }
 
-   override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
-                                                    _kvs: Seq[T],
-                                                    schemaVer: String,
-                                                    cacheElementOpt: Option[IndexEdge]): IndexEdge = {
-
-     assert(_kvs.size == 1)
-
-     val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-
-     val kv = kvs.head
-     val version = kv.timestamp
-     //    logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}")
-     var pos = 0
-     val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, schemaVer)
-     pos += srcIdLen
-     val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
-     pos += 4
-     val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
-     pos += 1
-
-     val op = kv.row(pos)
-     pos += 1
-
-     if (pos == kv.row.length) {
-       // degree
-       //      val degreeVal = Bytes.toLong(kv.value)
-       val degreeVal = bytesToLongFunc(kv.value, 0)
-       val ts = kv.timestamp
-       val props = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(ts, ts, schemaVer),
-         LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(degreeVal, ts, schemaVer))
-       val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer))
-       IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props)
-     } else {
-       // not degree edge
-
-
-       val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, schemaVer)
-       pos = endAt
-       val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length) {
-         (HBaseType.defaultTgtVertexId, 0)
-       } else {
-         TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, schemaVer)
-       }
-
-
-       val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs]
-       val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
-
-       /** process indexProps */
-       for {
-         (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
-       } {
-         if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version)
-         else allProps += seq -> InnerValLikeWithTs(v, version)
-       }
-
-       /** process props */
-       if (op == GraphUtil.operations("incrementCount")) {
-         //        val countVal = Bytes.toLong(kv.value)
-         val countVal = bytesToLongFunc(kv.value, 0)
-         allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
-       } else {
-         val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer)
-         props.foreach { case (k, v) =>
-           allProps += (k -> InnerValLikeWithTs(v, version))
-         }
-       }
-       val _mergedProps = allProps.result()
-       val mergedProps =
-         if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
-         else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer))
-
-       /** process tgtVertexId */
-       val tgtVertexId =
-         mergedProps.get(LabelMeta.toSeq) match {
-           case None => tgtVertexIdRaw
-           case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
-         }
-
-
-       IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps)
-
-     }
-   }
- }
+  override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
+                                                   _kvs: Seq[T],
+                                                   schemaVer: String,
+                                                   cacheElementOpt: Option[IndexEdge]): IndexEdge = {
+
+    assert(_kvs.size == 1)
+
+    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+
+    val kv = kvs.head
+    val version = kv.timestamp
+    //    logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}")
+    var pos = 0
+    val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, schemaVer)
+    pos += srcIdLen
+    val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
+    pos += 4
+    val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
+    pos += 1
+
+    val op = kv.row(pos)
+    pos += 1
+
+    if (pos == kv.row.length) {
+      // degree
+      //      val degreeVal = Bytes.toLong(kv.value)
+      val degreeVal = bytesToLongFunc(kv.value, 0)
+      val ts = kv.timestamp
+      val props = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(ts, ts, schemaVer),
+        LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(degreeVal, ts, schemaVer))
+      val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer))
+      IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props)
+    } else {
+      // not degree edge
+
+
+      val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, schemaVer)
+      pos = endAt
+      val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length) {
+        (HBaseType.defaultTgtVertexId, 0)
+      } else {
+        TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, schemaVer)
+      }
+
+      val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs]
+      val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
+
+      /** process indexProps */
+      for {
+        (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
+      } {
+        if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version)
+        else allProps += seq -> InnerValLikeWithTs(v, version)
+      }
+
+      /** process props */
+      if (op == GraphUtil.operations("incrementCount")) {
+        //        val countVal = Bytes.toLong(kv.value)
+        val countVal = bytesToLongFunc(kv.value, 0)
+        allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
+      } else {
+        val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer)
+        props.foreach { case (k, v) =>
+          allProps += (k -> InnerValLikeWithTs(v, version))
+        }
+      }
+      val _mergedProps = allProps.result()
+      val mergedProps =
+        if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
+        else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer))
+
+      /** process tgtVertexId */
+      val tgtVertexId =
+        mergedProps.get(LabelMeta.toSeq) match {
+          case None => tgtVertexIdRaw
+          case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
+        }
+
+
+      IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps)
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
index d00877e..f17e41c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
@@ -29,14 +29,13 @@ import org.apache.s2graph.core.{GraphUtil, IndexEdge}
 class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] {
    import StorageSerializable._
 
-   val label = indexEdge.label
-   val table = label.hbaseTableName.getBytes()
-   val cf = Serializable.edgeCf
+   override val ts = indexEdge.version
+   override val table = indexEdge.label.hbaseTableName.getBytes()
 
-   val idxPropsMap = indexEdge.orders.toMap
-   val idxPropsBytes = propsToBytes(indexEdge.orders)
+   def idxPropsMap = indexEdge.orders.toMap
+   def idxPropsBytes = propsToBytes(indexEdge.orders)
 
-   override def toKeyValues: Seq[SKeyValue] = {
+   override def toRowKey: Array[Byte] = {
      val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
      val labelWithDirBytes = indexEdge.labelWithDir.bytes
      val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
@@ -53,20 +52,16 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge
          }
 
      /** TODO search usage of op byte. if there is no, then remove opByte */
-     val rowBytes = Bytes.add(row, Array.fill(1)(GraphUtil.defaultOpByte), qualifier)
-     //    val qualifierBytes = Array.fill(1)(indexEdge.op)
-     val qualifierBytes = Array.empty[Byte]
+     Bytes.add(row, Array.fill(1)(GraphUtil.defaultOpByte), qualifier)
+   }
 
-     val value =
-       if (indexEdge.degreeEdge)
-         Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong)
-       else if (indexEdge.op == GraphUtil.operations("incrementCount"))
-         Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong)
-       else propsToKeyValues(indexEdge.metas.toSeq)
+   override def toQualifier: Array[Byte] = Array.empty[Byte]
 
-     val kv = SKeyValue(table, rowBytes, cf, qualifierBytes, value, indexEdge.version)
+   override def toValue: Array[Byte] =
+     if (indexEdge.degreeEdge)
+       Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong)
+     else if (indexEdge.op == GraphUtil.operations("incrementCount"))
+       Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong)
+     else propsToKeyValues(indexEdge.metas.toSeq)
 
-     //        logger.debug(s"[Ser]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}")
-     Seq(kv)
-   }
  }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
index b1b933f..5a8fa42 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -28,109 +28,113 @@ import scala.collection.immutable
 
 import scala.collection.immutable
 
+import scala.collection.immutable
+
 class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] {
-   import StorageDeserializable._
-
-   type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int)
-   type ValueRaw = (Array[(Byte, InnerValLike)], Int)
-
-   private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = {
-     //    val degree = Bytes.toLong(kv.value)
-     val degree = bytesToLongFunc(kv.value, 0)
-     val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version))
-     val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version))
-     (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
-   }
-
-   private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = {
-     var qualifierLen = 0
-     var pos = 0
-     val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = {
-       val (props, endAt) = bytesToProps(kv.qualifier, pos, version)
-       pos = endAt
-       qualifierLen += endAt
-       val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) {
-         (HBaseType.defaultTgtVertexId, 0)
-       } else {
-         TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version)
-       }
-       qualifierLen += tgtVertexIdLen
-       (props, endAt, tgtVertexId, tgtVertexIdLen)
-     }
-     val (op, opLen) =
-       if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0)
-       else (kv.qualifier(qualifierLen), 1)
-
-     qualifierLen += opLen
-
-     (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen)
-   }
-
-   private def parseValue(kv: SKeyValue, version: String): ValueRaw = {
-     val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version)
-     (props, endAt)
-   }
-
-   private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = {
-     (Array.empty[(Byte, InnerValLike)], 0)
-   }
-
-   override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
-                                                    _kvs: Seq[T],
-                                                    schemaVer: String,
-                                                    cacheElementOpt: Option[IndexEdge]): IndexEdge = {
-     assert(_kvs.size == 1)
-
-     val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-
-     val kv = kvs.head
-     val version = kv.timestamp
-
-     val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e =>
-       (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0)
-     }.getOrElse(parseRow(kv, schemaVer))
-
-     val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) =
-       if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, schemaVer)
-       else parseQualifier(kv, schemaVer)
-
-     val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs]
-     val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
-
-     /** process indexProps */
-     for {
-       (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
-     } {
-       if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version)
-       else allProps += seq -> InnerValLikeWithTs(v, version)
-     }
-
-     /** process props */
-     if (op == GraphUtil.operations("incrementCount")) {
-       //      val countVal = Bytes.toLong(kv.value)
-       val countVal = bytesToLongFunc(kv.value, 0)
-       allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
-     } else if (kv.qualifier.isEmpty) {
-       val countVal = bytesToLongFunc(kv.value, 0)
-       allProps += (LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
-     } else {
-       val (props, _) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer)
-       props.foreach { case (k, v) => allProps += (k -> InnerValLikeWithTs(v, version)) }
-     }
-
-     val _mergedProps = allProps.result()
-     val mergedProps =
-       if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
-            else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer))
-
-     /** process tgtVertexId */
-     val tgtVertexId =
-       mergedProps.get(LabelMeta.toSeq) match {
-         case None => tgtVertexIdRaw
-         case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
-       }
-
-     IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps)
-
-   }
- }
+
+
+  import StorageDeserializable._
+
+  type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int)
+  type ValueRaw = (Array[(Byte, InnerValLike)], Int)
+
+  private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = {
+    //    val degree = Bytes.toLong(kv.value)
+    val degree = bytesToLongFunc(kv.value, 0)
+    val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version))
+    val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version))
+    (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
+  }
+
+  private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = {
+    var qualifierLen = 0
+    var pos = 0
+    val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = {
+      val (props, endAt) = bytesToProps(kv.qualifier, pos, version)
+      pos = endAt
+      qualifierLen += endAt
+      val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) {
+        (HBaseType.defaultTgtVertexId, 0)
+      } else {
+        TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version)
+      }
+      qualifierLen += tgtVertexIdLen
+      (props, endAt, tgtVertexId, tgtVertexIdLen)
+    }
+    val (op, opLen) =
+      if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0)
+      else (kv.qualifier(qualifierLen), 1)
+
+    qualifierLen += opLen
+
+    (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen)
+  }
+
+  private def parseValue(kv: SKeyValue, version: String): ValueRaw = {
+    val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version)
+    (props, endAt)
+  }
+
+  private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = {
+    (Array.empty[(Byte, InnerValLike)], 0)
+  }
+
+  override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
+                                                   _kvs: Seq[T],
+                                                   schemaVer: String,
+                                                   cacheElementOpt: Option[IndexEdge]): IndexEdge = {
+    assert(_kvs.size == 1)
+
+    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+
+    val kv = kvs.head
+    val version = kv.timestamp
+
+    val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e =>
+      (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0)
+    }.getOrElse(parseRow(kv, schemaVer))
+
+    val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) =
+      if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, schemaVer)
+      else parseQualifier(kv, schemaVer)
+
+    val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs]
+    val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
+
+    /** process indexProps */
+    for {
+      (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
+    } {
+      if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version)
+      else allProps += seq -> InnerValLikeWithTs(v, version)
+    }
+
+    /** process props */
+    if (op == GraphUtil.operations("incrementCount")) {
+      //      val countVal = Bytes.toLong(kv.value)
+      val countVal = bytesToLongFunc(kv.value, 0)
+      allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
+    } else if (kv.qualifier.isEmpty) {
+      val countVal = bytesToLongFunc(kv.value, 0)
+      allProps += (LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
+    } else {
+      val (props, _) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer)
+      props.foreach { case (k, v) => allProps += (k -> InnerValLikeWithTs(v, version)) }
+    }
+
+    val _mergedProps = allProps.result()
+    val mergedProps =
+      if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
+      else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer))
+
+    /** process tgtVertexId */
+    val tgtVertexId =
+      mergedProps.get(LabelMeta.toSeq) match {
+        case None => tgtVertexIdRaw
+        case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
+      }
+
+    IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps)
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
index 49e95b4..83d4338 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
@@ -28,44 +28,40 @@ import org.apache.s2graph.core.{GraphUtil, IndexEdge}
 class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] {
    import StorageSerializable._
 
-   val label = indexEdge.label
-   val table = label.hbaseTableName.getBytes()
-   val cf = Serializable.edgeCf
+   override val ts = indexEdge.version
+   override val table = indexEdge.label.hbaseTableName.getBytes()
 
-   val idxPropsMap = indexEdge.orders.toMap
-   val idxPropsBytes = propsToBytes(indexEdge.orders)
+   def idxPropsMap = indexEdge.orders.toMap
+   def idxPropsBytes = propsToBytes(indexEdge.orders)
 
-   override def toKeyValues: Seq[SKeyValue] = {
+   override def toRowKey: Array[Byte] = {
      val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
      val labelWithDirBytes = indexEdge.labelWithDir.bytes
      val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
 
-     val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
-     //    logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}")
+     Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
+   }
+
+   override def toQualifier: Array[Byte] = {
      val tgtIdBytes = VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes
-     val qualifier =
-       if (indexEdge.degreeEdge) Array.empty[Byte]
-       else {
-         if (indexEdge.op == GraphUtil.operations("incrementCount")) {
-           Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op))
-         } else {
-           idxPropsMap.get(LabelMeta.toSeq) match {
-             case None => Bytes.add(idxPropsBytes, tgtIdBytes)
-             case Some(vId) => idxPropsBytes
-           }
+     if (indexEdge.degreeEdge) Array.empty[Byte]
+     else {
+       if (indexEdge.op == GraphUtil.operations("incrementCount")) {
+         Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op))
+       } else {
+         idxPropsMap.get(LabelMeta.toSeq) match {
+           case None => Bytes.add(idxPropsBytes, tgtIdBytes)
+           case Some(vId) => idxPropsBytes
          }
        }
-
-
-     val value =
-       if (indexEdge.degreeEdge)
-         Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong)
-       else if (indexEdge.op == GraphUtil.operations("incrementCount"))
-         Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong)
-       else propsToKeyValues(indexEdge.metas.toSeq)
-
-     val kv = SKeyValue(table, row, cf, qualifier, value, indexEdge.version)
-
-     Seq(kv)
+     }
    }
+
+  override def toValue: Array[Byte] =
+    if (indexEdge.degreeEdge)
+      Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong)
+    else if (indexEdge.op == GraphUtil.operations("incrementCount"))
+      Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong)
+    else propsToKeyValues(indexEdge.metas.toSeq)
+  
  }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
index 716a6b9..4f7c17b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
@@ -29,9 +29,8 @@ import org.apache.s2graph.core.types.SourceAndTargetVertexIdPair
 class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] {
   import StorageSerializable._
 
-  val label = snapshotEdge.label
-  val table = label.hbaseTableName.getBytes()
-  val cf = Serializable.edgeCf
+  override val ts = snapshotEdge.version
+  override val table = snapshotEdge.label.hbaseTableName.getBytes()
 
   def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = {
     val byte = (((statusCode << 4) | op).toByte)
@@ -40,16 +39,18 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[
   def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op),
     propsToKeyValuesWithTs(snapshotEdge.props.toList))
 
-  override def toKeyValues: Seq[SKeyValue] = {
+  override def toRowKey: Array[Byte] = {
     val srcIdAndTgtIdBytes = SourceAndTargetVertexIdPair(snapshotEdge.srcVertex.innerId, snapshotEdge.tgtVertex.innerId).bytes
     val labelWithDirBytes = snapshotEdge.labelWithDir.bytes
     val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true)
 
-    val row = Bytes.add(srcIdAndTgtIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
+    Bytes.add(srcIdAndTgtIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
+  }
 
-    val qualifier = Array.empty[Byte]
+  override def toQualifier: Array[Byte] = Array.empty[Byte]
 
-    val value = snapshotEdge.pendingEdgeOpt match {
+  override def toValue: Array[Byte] =
+    snapshotEdge.pendingEdgeOpt match {
       case None => valueBytes()
       case Some(pendingEdge) =>
         val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op)
@@ -60,7 +61,4 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[
         Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes))
     }
 
-    val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version)
-    Seq(kv)
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
index 2eb2b1b..757ef1b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
@@ -34,9 +34,8 @@ import org.apache.s2graph.core.types.VertexId
 class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] {
   import StorageSerializable._
 
-  val label = snapshotEdge.label
-  val table = label.hbaseTableName.getBytes()
-  val cf = Serializable.edgeCf
+  override val ts = snapshotEdge.version
+  override val table = snapshotEdge.label.hbaseTableName.getBytes()
 
   def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = {
     val byte = (((statusCode << 4) | op).toByte)
@@ -45,17 +44,20 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[
   def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op),
     propsToKeyValuesWithTs(snapshotEdge.props.toList))
 
-  override def toKeyValues: Seq[SKeyValue] = {
+
+  override def toRowKey: Array[Byte] = {
     val srcIdBytes = VertexId.toSourceVertexId(snapshotEdge.srcVertex.id).bytes
     val labelWithDirBytes = snapshotEdge.labelWithDir.bytes
     val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true)
 
-    val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
-    val tgtIdBytes = VertexId.toTargetVertexId(snapshotEdge.tgtVertex.id).bytes
+    Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
+  }
 
-    val qualifier = tgtIdBytes
+  override def toQualifier: Array[Byte] =
+    VertexId.toTargetVertexId(snapshotEdge.tgtVertex.id).bytes
 
-    val value = snapshotEdge.pendingEdgeOpt match {
+  override def toValue: Array[Byte] =
+    snapshotEdge.pendingEdgeOpt match {
       case None => valueBytes()
       case Some(pendingEdge) =>
         val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op)
@@ -64,7 +66,5 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[
         val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get)
         Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes))
     }
-    val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version)
-    Seq(kv)
-  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
index a74031a..6bb162c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
@@ -25,10 +25,18 @@ import org.apache.s2graph.core.storage.{SKeyValue, Serializable}
 
 case class VertexSerializable(vertex: Vertex) extends Serializable[Vertex] {
 
-  val cf = Serializable.vertexCf
+  override val table = vertex.hbaseTableName.getBytes
+  override val ts = vertex.ts
+  override val cf = Serializable.vertexCf
 
+  override def toRowKey: Array[Byte] = vertex.id.bytes
+
+  override def toQualifier: Array[Byte] = Array.empty[Byte]
+  override def toValue: Array[Byte] = Array.empty[Byte]
+
+  /** vertex override toKeyValues since vertex expect to produce multiple sKeyValues */
   override def toKeyValues: Seq[SKeyValue] = {
-    val row = vertex.id.bytes
+    val row = toRowKey
     val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield Bytes.toBytes(k) -> v.bytes
     val belongsTo = vertex.belongLabelIds.map { labelId => Bytes.toBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] }
     (base ++ belongsTo).map { case (qualifier, value) =>