You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/03/04 15:56:20 UTC

incubator-s2graph git commit: [S2GRAPH-53]: Refactor Storage to decide which serializer/deserializer for IndexEdge/SnapshotEdge/Vertex.

Repository: incubator-s2graph
Updated Branches:
  refs/heads/master e207f676f -> fd8119bc9


[S2GRAPH-53]: Refactor Storage to decide which serializer/deserializer for IndexEdge/SnapshotEdge/Vertex.

  add serde package and change storage to contain compatability table.

JIRA:
  [S2GRAPH-53] https://issues.apache.org/jira/browse/S2GRAPH-53

Pull Request:
  Closes #37


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/fd8119bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/fd8119bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/fd8119bc

Branch: refs/heads/master
Commit: fd8119bc9dc1cabc07bcf8b7dc49258345f45a3e
Parents: e207f67
Author: DO YUNG YOON <st...@apache.org>
Authored: Fri Mar 4 23:49:30 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Fri Mar 4 23:49:30 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../core/storage/IndexEdgeDeserializable.scala  | 128 -----------------
 .../core/storage/IndexEdgeSerializable.scala    |  58 --------
 .../storage/SnapshotEdgeDeserializable.scala    | 142 -------------------
 .../core/storage/SnapshotEdgeSerializable.scala |  76 ----------
 .../kakao/s2graph/core/storage/Storage.scala    |  59 ++++++--
 .../core/storage/VertexDeserializable.scala     |  46 ------
 .../core/storage/VertexSerializable.scala       |  18 ---
 .../tall/IndexEdgeDeserializable.scala          | 132 +++++++++++++++++
 .../indexedge/tall/IndexEdgeSerializable.scala  |  53 +++++++
 .../wide/IndexEdgeDeserializable.scala          | 116 +++++++++++++++
 .../indexedge/wide/IndexEdgeSerializable.scala  |  53 +++++++
 .../tall/SnapshotEdgeDeserializable.scala       |  84 +++++++++++
 .../tall/SnapshotEdgeSerializable.scala         |  47 ++++++
 .../wide/SnapshotEdgeDeserializable.scala       |  70 +++++++++
 .../wide/SnapshotEdgeSerializable.scala         |  50 +++++++
 .../serde/vertex/VertexDeserializable.scala     |  46 ++++++
 .../serde/vertex/VertexSerializable.scala       |  20 +++
 18 files changed, 725 insertions(+), 476 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 97434d6..a91718f 100644
--- a/CHANGES
+++ b/CHANGES
@@ -37,6 +37,9 @@ Release 0.12.1 - unreleased
 
     S2GRAPH-44: Provide cache for WhereParser on query (Committed by DOYUNG YOON).
 
+    S2GRAPH-53: Refactor Storage to decide which serializer/deserializer for IndexEdge/SnapshotEdge/Vertex 
+		(Committed by DOYUNG YOON).
+
   BUG FIXES
 
     S2GRAPH-18: Query Option "interval" is Broken. 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeDeserializable.scala
deleted file mode 100644
index 2190222..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeDeserializable.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-package com.kakao.s2graph.core.storage
-
-import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls.LabelMeta
-import com.kakao.s2graph.core.storage.{CanSKeyValue, StorageDeserializable, SKeyValue}
-import com.kakao.s2graph.core.types._
-import org.apache.hadoop.hbase.util.Bytes
-import StorageDeserializable._
-
-class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] {
-
-  import StorageDeserializable._
-
-  type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int)
-  type ValueRaw = (Array[(Byte, InnerValLike)], Int)
-
-  private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = {
-//    val degree = Bytes.toLong(kv.value)
-    val degree = bytesToLongFunc(kv.value, 0)
-    val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version))
-    val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version))
-    (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
-  }
-
-  private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = {
-    var qualifierLen = 0
-    var pos = 0
-    val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = {
-      val (props, endAt) = bytesToProps(kv.qualifier, pos, version)
-      pos = endAt
-      qualifierLen += endAt
-      val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) {
-        (HBaseType.defaultTgtVertexId, 0)
-      } else {
-        TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version)
-      }
-      qualifierLen += tgtVertexIdLen
-      (props, endAt, tgtVertexId, tgtVertexIdLen)
-    }
-    val (op, opLen) =
-      if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0)
-      else (kv.qualifier(qualifierLen), 1)
-
-    qualifierLen += opLen
-
-    (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen)
-  }
-
-  private def parseValue(kv: SKeyValue, version: String): ValueRaw = {
-    val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version)
-    (props, endAt)
-  }
-
-  private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = {
-    (Array.empty[(Byte, InnerValLike)], 0)
-  }
-
-
-
-  /** version 1 and version 2 is same logic */
-  override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
-                                              _kvs: Seq[T],
-                                              version: String,
-                                              cacheElementOpt: Option[IndexEdge] = None): IndexEdge = {
-    fromKeyValuesInnerOld(queryParam, _kvs, version, cacheElementOpt)
-  }
-
-  def fromKeyValuesInnerOld[T: CanSKeyValue](queryParam: QueryParam,
-                                          _kvs: Seq[T],
-                                          version: String,
-                                          cacheElementOpt: Option[IndexEdge] = None): IndexEdge = {
-    assert(_kvs.size == 1)
-
-    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-
-    val kv = kvs.head
-    val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e =>
-      (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0)
-    }.getOrElse(parseRow(kv, version))
-
-    val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) =
-      if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, version)
-      else parseQualifier(kv, version)
-
-    val (props, _) = if (op == GraphUtil.operations("incrementCount")) {
-//      val countVal = Bytes.toLong(kv.value)
-      val countVal = bytesToLongFunc(kv.value, 0)
-      val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version))
-      (dummyProps, 8)
-    } else if (kv.qualifier.isEmpty) {
-      parseDegreeValue(kv, version)
-    } else {
-      parseValue(kv, version)
-    }
-
-    val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
-
-
-    //    assert(kv.qualifier.nonEmpty && index.metaSeqs.size == idxPropsRaw.size)
-
-    val idxProps = for {
-      (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
-    } yield {
-        if (k == LabelMeta.degreeSeq) k -> v
-        else seq -> v
-      }
-
-    val idxPropsMap = idxProps.toMap
-    val tgtVertexId = if (tgtVertexIdInQualifier) {
-      idxPropsMap.get(LabelMeta.toSeq) match {
-        case None => tgtVertexIdRaw
-        case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId)
-      }
-    } else tgtVertexIdRaw
-
-    val _mergedProps = (idxProps ++ props).toMap
-    val mergedProps =
-      if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
-      else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version))
-
-    //    logger.error(s"$mergedProps")
-    //    val ts = mergedProps(LabelMeta.timeStampSeq).toString().toLong
-
-    val ts = kv.timestamp
-    IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps)
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeSerializable.scala
deleted file mode 100644
index 56f70b9..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeSerializable.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-package com.kakao.s2graph.core.storage
-
-import com.kakao.s2graph.core.mysqls.LabelMeta
-import com.kakao.s2graph.core.storage.{StorageSerializable, SKeyValue}
-import com.kakao.s2graph.core.types.{HBaseType, VertexId}
-import com.kakao.s2graph.core.utils.logger
-import com.kakao.s2graph.core.{GraphUtil, IndexEdge}
-import org.apache.hadoop.hbase.util.Bytes
-
-case class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] {
-
-  import StorageSerializable._
-
-  val label = indexEdge.label
-  val table = label.hbaseTableName.getBytes()
-  val cf = Serializable.edgeCf
-
-  val idxPropsMap = indexEdge.orders.toMap
-  val idxPropsBytes = propsToBytes(indexEdge.orders)
-
-  /** version 1 and version 2 share same code for serialize row key part */
-  override def toKeyValues: Seq[SKeyValue] = {
-    toKeyValuesInner
-  }
-  def toKeyValuesInner: Seq[SKeyValue] = {
-    val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
-    val labelWithDirBytes = indexEdge.labelWithDir.bytes
-    val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
-
-    val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
-    //    logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}")
-    val tgtIdBytes = VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes
-    val qualifier =
-      if (indexEdge.degreeEdge) Array.empty[Byte]
-      else {
-        if (indexEdge.op == GraphUtil.operations("incrementCount")) {
-          Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op))
-        } else {
-          idxPropsMap.get(LabelMeta.toSeq) match {
-            case None => Bytes.add(idxPropsBytes, tgtIdBytes)
-            case Some(vId) => idxPropsBytes
-          }
-        }
-      }
-
-
-    val value =
-      if (indexEdge.degreeEdge)
-        Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong)
-      else if (indexEdge.op == GraphUtil.operations("incrementCount"))
-        Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong)
-      else propsToKeyValues(indexEdge.metas.toSeq)
-
-    val kv = SKeyValue(table, row, cf, qualifier, value, indexEdge.version)
-
-    Seq(kv)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeDeserializable.scala
deleted file mode 100644
index d4f55f0..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeDeserializable.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-package com.kakao.s2graph.core.storage
-
-import com.kakao.s2graph.core.mysqls.{LabelIndex, LabelMeta}
-import com.kakao.s2graph.core.storage.{CanSKeyValue, SKeyValue, StorageDeserializable}
-import com.kakao.s2graph.core.types._
-import com.kakao.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex}
-import org.apache.hadoop.hbase.util.Bytes
-
-class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
-
-  import StorageDeserializable._
-
-  override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
-                                                   _kvs: Seq[T],
-                                                   version: String,
-                                                   cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = {
-    queryParam.label.schemaVersion match {
-      case HBaseType.VERSION2 | HBaseType.VERSION1 => fromKeyValuesInnerOld(queryParam, _kvs, version, cacheElementOpt)
-      case _  => fromKeyValuesInnerV3(queryParam, _kvs, version, cacheElementOpt)
-    }
-  }
-
-  def statusCodeWithOp(byte: Byte): (Byte, Byte) = {
-    val statusCode = byte >> 4
-    val op = byte & ((1 << 4) - 1)
-    (statusCode.toByte, op.toByte)
-  }
-
-  def fromKeyValuesInnerOld[T: CanSKeyValue](queryParam: QueryParam, _kvs: Seq[T], version: String, cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = {
-    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-    assert(kvs.size == 1)
-
-    val kv = kvs.head
-    val schemaVer = queryParam.label.schemaVersion
-    val cellVersion = kv.timestamp
-
-    val (srcVertexId, labelWithDir, _, _, _) = cacheElementOpt.map { e =>
-      (e.srcVertex.id, e.labelWithDir, LabelIndex.DefaultSeq, true, 0)
-    }.getOrElse(parseRow(kv, schemaVer))
-
-    val (tgtVertexId, props, op, ts, statusCode, _pendingEdgeOpt) = {
-      val (tgtVertexId, _) = TargetVertexId.fromBytes(kv.qualifier, 0, kv.qualifier.length, schemaVer)
-      var pos = 0
-      val (statusCode, op) = statusCodeWithOp(kv.value(pos))
-      pos += 1
-      val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
-      val kvsMap = props.toMap
-      val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong
-
-      pos = endAt
-      val _pendingEdgeOpt =
-        if (pos == kv.value.length) None
-        else {
-          val (pendingEdgeStatusCode, pendingEdgeOp) = statusCodeWithOp(kv.value(pos))
-          pos += 1
-          //          val versionNum = Bytes.toLong(kv.value, pos, 8)
-          //          pos += 8
-          val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
-          pos = endAt
-          val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
-
-          val pendingEdge =
-            Edge(Vertex(srcVertexId, cellVersion),
-              Vertex(tgtVertexId, cellVersion),
-              labelWithDir, pendingEdgeOp,
-              cellVersion, pendingEdgeProps.toMap,
-              statusCode = pendingEdgeStatusCode, lockTs = lockTs)
-          Option(pendingEdge)
-        }
-
-      (tgtVertexId, kvsMap, op, ts, statusCode, _pendingEdgeOpt)
-    }
-
-    SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts),
-      labelWithDir, op, cellVersion, props, statusCode = statusCode,
-      pendingEdgeOpt = _pendingEdgeOpt, lockTs = None)
-  }
-
-  private def fromKeyValuesInnerV3[T: CanSKeyValue](queryParam: QueryParam, _kvs: Seq[T], version: String, cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = {
-    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-    assert(kvs.size == 1)
-
-    val kv = kvs.head
-    val schemaVer = queryParam.label.schemaVersion
-    val cellVersion = kv.timestamp
-    /** rowKey */
-    def parseRowV3(kv: SKeyValue, version: String) = {
-      var pos = 0
-      val (srcIdAndTgtId, srcIdAndTgtIdLen) = SourceAndTargetVertexIdPair.fromBytes(kv.row, pos, kv.row.length, version)
-      pos += srcIdAndTgtIdLen
-      val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
-      pos += 4
-      val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
-
-      val rowLen = srcIdAndTgtIdLen + 4 + 1
-      (srcIdAndTgtId.srcInnerId, srcIdAndTgtId.tgtInnerId, labelWithDir, labelIdxSeq, isInverted, rowLen)
-
-    }
-    val (srcInnerId, tgtInnerId, labelWithDir, _, _, _) = cacheElementOpt.map { e =>
-      (e.srcVertex.innerId, e.tgtVertex.innerId, e.labelWithDir, LabelIndex.DefaultSeq, true, 0)
-    }.getOrElse(parseRowV3(kv, schemaVer))
-
-    val srcVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, srcInnerId)
-    val tgtVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, tgtInnerId)
-
-    val (props, op, ts, statusCode, _pendingEdgeOpt) = {
-      var pos = 0
-      val (statusCode, op) = statusCodeWithOp(kv.value(pos))
-      pos += 1
-      val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
-      val kvsMap = props.toMap
-      val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong
-
-      pos = endAt
-      val _pendingEdgeOpt =
-        if (pos == kv.value.length) None
-        else {
-          val (pendingEdgeStatusCode, pendingEdgeOp) = statusCodeWithOp(kv.value(pos))
-          pos += 1
-          //          val versionNum = Bytes.toLong(kv.value, pos, 8)
-          //          pos += 8
-          val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
-          pos = endAt
-          val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
-
-          val pendingEdge =
-            Edge(Vertex(srcVertexId, cellVersion),
-              Vertex(tgtVertexId, cellVersion),
-              labelWithDir, pendingEdgeOp,
-              cellVersion, pendingEdgeProps.toMap,
-              statusCode = pendingEdgeStatusCode, lockTs = lockTs)
-          Option(pendingEdge)
-        }
-
-      (kvsMap, op, ts, statusCode, _pendingEdgeOpt)
-    }
-
-    SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts),
-      labelWithDir, op, cellVersion, props, statusCode = statusCode,
-      pendingEdgeOpt = _pendingEdgeOpt, lockTs = None)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeSerializable.scala
deleted file mode 100644
index 9e6e1b7..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeSerializable.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-package com.kakao.s2graph.core.storage
-
-import com.kakao.s2graph.core.SnapshotEdge
-import com.kakao.s2graph.core.mysqls.LabelIndex
-import com.kakao.s2graph.core.types.{HBaseType, SourceAndTargetVertexIdPair, VertexId}
-import org.apache.hadoop.hbase.util.Bytes
-
-
-class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] {
-  import StorageSerializable._
-
-  val label = snapshotEdge.label
-  val table = label.hbaseTableName.getBytes()
-  val cf = Serializable.edgeCf
-
-  def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = {
-    val byte = (((statusCode << 4) | op).toByte)
-    Array.fill(1)(byte.toByte)
-  }
-  def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op),
-    propsToKeyValuesWithTs(snapshotEdge.props.toList))
-
-  override def toKeyValues: Seq[SKeyValue] = {
-    label.schemaVersion match {
-      case HBaseType.VERSION1 | HBaseType.VERSION2 => toKeyValuesInner
-      case _ => toKeyValuesInnerV3
-    }
-  }
-
-  private def toKeyValuesInner: Seq[SKeyValue] = {
-    val srcIdBytes = VertexId.toSourceVertexId(snapshotEdge.srcVertex.id).bytes
-    val labelWithDirBytes = snapshotEdge.labelWithDir.bytes
-    val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true)
-
-    val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
-    val tgtIdBytes = VertexId.toTargetVertexId(snapshotEdge.tgtVertex.id).bytes
-
-    val qualifier = tgtIdBytes
-
-    val value = snapshotEdge.pendingEdgeOpt match {
-      case None => valueBytes()
-      case Some(pendingEdge) =>
-        val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op)
-        val versionBytes = Array.empty[Byte]
-        val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq)
-        val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get)
-        Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes))
-    }
-    val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version)
-    Seq(kv)
-  }
-
-  private def toKeyValuesInnerV3: Seq[SKeyValue] = {
-    val srcIdAndTgtIdBytes = SourceAndTargetVertexIdPair(snapshotEdge.srcVertex.innerId, snapshotEdge.tgtVertex.innerId).bytes
-    val labelWithDirBytes = snapshotEdge.labelWithDir.bytes
-    val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true)
-
-    val row = Bytes.add(srcIdAndTgtIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
-
-    val qualifier = Array.empty[Byte]
-
-    val value = snapshotEdge.pendingEdgeOpt match {
-      case None => valueBytes()
-      case Some(pendingEdge) =>
-        val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op)
-        val versionBytes = Array.empty[Byte]
-        val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq)
-        val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get)
-
-        Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes))
-    }
-
-    val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version)
-    Seq(kv)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
index bca8df3..cc8f13c 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
@@ -4,6 +4,10 @@ import com.kakao.s2graph.core.ExceptionHandler.{Key, Val, KafkaMessage}
 import com.kakao.s2graph.core.GraphExceptions.FetchTimeoutException
 import com.kakao.s2graph.core._
 import com.kakao.s2graph.core.mysqls._
+import com.kakao.s2graph.core.storage.serde._
+import com.kakao.s2graph.core.storage.serde.snapshotedge.tall
+import com.kakao.s2graph.core.storage.serde.snapshotedge.wide.SnapshotEdgeDeserializable
+import com.kakao.s2graph.core.storage.serde.vertex._
 import com.kakao.s2graph.core.types._
 import com.kakao.s2graph.core.utils.{Extensions, logger}
 import com.typesafe.config.Config
@@ -16,6 +20,8 @@ import scala.concurrent.{ExecutionContext, Future}
 import scala.util.{Random, Try}
 
 abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
+  import HBaseType._
+
   /** storage dependent configurations */
   val MaxRetryNum = config.getInt("max.retry.number")
   val MaxBackOff = config.getInt("max.back.off")
@@ -26,6 +32,14 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
   val expireAfterWrite = config.getInt("future.cache.expire.after.write")
   val expireAfterAccess = config.getInt("future.cache.expire.after.access")
 
+  /**
+   * Compatibility table
+   * | label schema version | snapshot edge | index edge | vertex | note |
+   * | v1 | serde.snapshotedge.wide | serde.indexedge.wide | serde.vertex | do not use this. this exist only for backward compatibility issue |
+   * | v2 | serde.snapshotedge.wide | serde.indexedge.wide | serde.vertex | do not use this. this exist only for backward compatibility issue |
+   * | v3 | serde.snapshotedge.tall | serde.indexedge.wide | serde.vertex | recommended with HBase. current stable schema |
+   *
+   */
 
   /**
    * create serializer that knows how to convert given snapshotEdge into kvs: Seq[SKeyValue]
@@ -33,14 +47,26 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
    * @param snapshotEdge: snapshotEdge to serialize
    * @return serializer implementation for StorageSerializable which has toKeyValues return Seq[SKeyValue]
    */
-  def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge) = new SnapshotEdgeSerializable(snapshotEdge)
+  def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge) = {
+    snapshotEdge.schemaVer match {
+      case VERSION1 | VERSION2 => new serde.snapshotedge.wide.SnapshotEdgeSerializable(snapshotEdge)
+      case VERSION3 => new serde.snapshotedge.tall.SnapshotEdgeSerializable(snapshotEdge)
+      case _ => throw new RuntimeException(s"not supported version: ${snapshotEdge.schemaVer}")
+    }
+  }
 
   /**
    * create serializer that knows how to convert given indexEdge into kvs: Seq[SKeyValue]
-   * @param indexedEdge: indexEdge to serialize
+   * @param indexEdge: indexEdge to serialize
    * @return serializer implementation
    */
-  def indexEdgeSerializer(indexedEdge: IndexEdge) = new IndexEdgeSerializable(indexedEdge)
+  def indexEdgeSerializer(indexEdge: IndexEdge) = {
+    indexEdge.schemaVer match {
+      case VERSION1 | VERSION2 | VERSION3 => new indexedge.wide.IndexEdgeSerializable(indexEdge)
+      case _ => throw new RuntimeException(s"not supported version: ${indexEdge.schemaVer}")
+
+    }
+  }
 
   /**
    * create serializer that knows how to convert given vertex into kvs: Seq[SKeyValue]
@@ -58,10 +84,24 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
    * if any storaage use different class to represent stored byte array,
    * then that storage implementation is responsible to provide implicit type conversion method on CanSKeyValue.
    * */
-  val snapshotEdgeDeserializer = new SnapshotEdgeDeserializable
+
+  val snapshotEdgeDeserializers = Map(
+    VERSION1 -> new snapshotedge.wide.SnapshotEdgeDeserializable,
+    VERSION2 -> new snapshotedge.wide.SnapshotEdgeDeserializable,
+    VERSION3 -> new tall.SnapshotEdgeDeserializable
+  )
+  def snapshotEdgeDeserializer(schemaVer: String) =
+    snapshotEdgeDeserializers.get(schemaVer).getOrElse(throw new RuntimeException(s"not supported version: ${schemaVer}"))
 
   /** create deserializer that can parse stored CanSKeyValue into indexEdge. */
-  val indexEdgeDeserializer = new IndexEdgeDeserializable
+  val indexEdgeDeserializers = Map(
+    VERSION1 -> new indexedge.wide.IndexEdgeDeserializable,
+    VERSION2 -> new indexedge.wide.IndexEdgeDeserializable,
+    VERSION3 -> new indexedge.wide.IndexEdgeDeserializable
+  )
+
+  def indexEdgeDeserializer(schemaVer: String) =
+    indexEdgeDeserializers.get(schemaVer).getOrElse(throw new RuntimeException(s"not supported version: ${schemaVer}"))
 
   /** create deserializer that can parser stored CanSKeyValue into vertex. */
   val vertexDeserializer = new VertexDeserializable
@@ -587,7 +627,8 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
                               parentEdges: Seq[EdgeWithScore]): Option[Edge] = {
 //        logger.debug(s"toEdge: $kv")
     try {
-      val indexEdgeOpt = indexEdgeDeserializer.fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, cacheElementOpt)
+      val schemaVer = queryParam.label.schemaVersion
+      val indexEdgeOpt = indexEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, cacheElementOpt)
       indexEdgeOpt.map(indexEdge => indexEdge.toEdge.copy(parentEdges = parentEdges))
     } catch {
       case ex: Exception =>
@@ -602,7 +643,8 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
                                       isInnerCall: Boolean,
                                       parentEdges: Seq[EdgeWithScore]): Option[Edge] = {
 //        logger.debug(s"SnapshottoEdge: $kv")
-    val snapshotEdgeOpt = snapshotEdgeDeserializer.fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, cacheElementOpt)
+    val schemaVer = queryParam.label.schemaVersion
+    val snapshotEdgeOpt = snapshotEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, cacheElementOpt)
 
     if (isInnerCall) {
       snapshotEdgeOpt.flatMap { snapshotEdge =>
@@ -631,9 +673,10 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
     else {
       val first = kvs.head
       val kv = first
+      val schemaVer = queryParam.label.schemaVersion
       val cacheElementOpt =
         if (queryParam.isSnapshotEdge) None
-        else indexEdgeDeserializer.fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, None)
+        else indexEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, None)
 
       for {
         kv <- kvs

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexDeserializable.scala
deleted file mode 100644
index 699981d..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexDeserializable.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-package com.kakao.s2graph.core.storage
-
-import com.kakao.s2graph.core.types.{InnerVal, InnerValLike, VertexId}
-import com.kakao.s2graph.core.{QueryParam, Vertex}
-import org.apache.hadoop.hbase.util.Bytes
-
-import scala.collection.mutable.ListBuffer
-
-class VertexDeserializable extends Deserializable[Vertex] {
-  def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
-                                     _kvs: Seq[T],
-                                     version: String,
-                                     cacheElementOpt: Option[Vertex]): Vertex = {
-
-    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-
-    val kv = kvs.head
-    val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version)
-
-    var maxTs = Long.MinValue
-    val propsMap = new collection.mutable.HashMap[Int, InnerValLike]
-    val belongLabelIds = new ListBuffer[Int]
-
-    for {
-      kv <- kvs
-    } {
-      val propKey =
-        if (kv.qualifier.length == 1) kv.qualifier.head.toInt
-        else Bytes.toInt(kv.qualifier)
-
-      val ts = kv.timestamp
-      if (ts > maxTs) maxTs = ts
-
-      if (Vertex.isLabelId(propKey)) {
-        belongLabelIds += Vertex.toLabelId(propKey)
-      } else {
-        val v = kv.value
-        val (value, _) = InnerVal.fromBytes(v, 0, v.length, version)
-        propsMap += (propKey -> value)
-      }
-    }
-    assert(maxTs != Long.MinValue)
-    Vertex(vertexId, maxTs, propsMap.toMap, belongLabelIds = belongLabelIds)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexSerializable.scala
deleted file mode 100644
index bda909d..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexSerializable.scala
+++ /dev/null
@@ -1,18 +0,0 @@
-package com.kakao.s2graph.core.storage
-
-import com.kakao.s2graph.core.Vertex
-import org.apache.hadoop.hbase.util.Bytes
-
-case class VertexSerializable(vertex: Vertex) extends Serializable[Vertex] {
-
-  val cf = Serializable.vertexCf
-
-  override def toKeyValues: Seq[SKeyValue] = {
-    val row = vertex.id.bytes
-    val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield Bytes.toBytes(k) -> v.bytes
-    val belongsTo = vertex.belongLabelIds.map { labelId => Bytes.toBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] }
-    (base ++ belongsTo).map { case (qualifier, value) =>
-      SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, vertex.ts)
-    } toSeq
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
new file mode 100644
index 0000000..014a5c9
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -0,0 +1,132 @@
+package com.kakao.s2graph.core.storage.serde.indexedge.tall
+
+import com.kakao.s2graph.core.mysqls.LabelMeta
+import com.kakao.s2graph.core.storage.StorageDeserializable._
+import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, StorageDeserializable}
+import com.kakao.s2graph.core.types._
+import com.kakao.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex}
+import org.apache.hadoop.hbase.util.Bytes
+
+class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] {
+   import StorageDeserializable._
+
+   type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int)
+   type ValueRaw = (Array[(Byte, InnerValLike)], Int)
+
+   private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = {
+     //    val degree = Bytes.toLong(kv.value)
+     val degree = bytesToLongFunc(kv.value, 0)
+     val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version))
+     val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version))
+     (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
+   }
+
+   private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = {
+     var qualifierLen = 0
+     var pos = 0
+     val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = {
+       val (props, endAt) = bytesToProps(kv.qualifier, pos, version)
+       pos = endAt
+       qualifierLen += endAt
+       val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) {
+         (HBaseType.defaultTgtVertexId, 0)
+       } else {
+         TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version)
+       }
+       qualifierLen += tgtVertexIdLen
+       (props, endAt, tgtVertexId, tgtVertexIdLen)
+     }
+     val (op, opLen) =
+       if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0)
+       else (kv.qualifier(qualifierLen), 1)
+
+     qualifierLen += opLen
+
+     (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen)
+   }
+
+   private def parseValue(kv: SKeyValue, version: String): ValueRaw = {
+     val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version)
+     (props, endAt)
+   }
+
+   private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = {
+     (Array.empty[(Byte, InnerValLike)], 0)
+   }
+
+   override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
+                                                    _kvs: Seq[T],
+                                                    version: String,
+                                                    cacheElementOpt: Option[IndexEdge]): IndexEdge = {
+
+     assert(_kvs.size == 1)
+
+     val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+
+     val kv = kvs.head
+
+     //    logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}")
+     var pos = 0
+     val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, version)
+     pos += srcIdLen
+     val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
+     pos += 4
+     val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
+     pos += 1
+
+     val op = kv.row(pos)
+     pos += 1
+
+     if (pos == kv.row.length) {
+       // degree
+       //      val degreeVal = Bytes.toLong(kv.value)
+       val degreeVal = bytesToLongFunc(kv.value, 0)
+       val ts = kv.timestamp
+       val props = Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, version),
+         LabelMeta.degreeSeq -> InnerVal.withLong(degreeVal, version))
+       val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version))
+       IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props)
+     } else {
+       // not degree edge
+       val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
+
+       val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, version)
+       pos = endAt
+       val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length) {
+         (HBaseType.defaultTgtVertexId, 0)
+       } else {
+         TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, version)
+       }
+
+       val idxProps = for {
+         (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
+       } yield if (k == LabelMeta.degreeSeq) k -> v else seq -> v
+
+       val idxPropsMap = idxProps.toMap
+
+       val tgtVertexId =
+         idxPropsMap.get(LabelMeta.toSeq) match {
+           case None => tgtVertexIdRaw
+           case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId)
+         }
+
+       val (props, _) = if (op == GraphUtil.operations("incrementCount")) {
+         //        val countVal = Bytes.toLong(kv.value)
+         val countVal = bytesToLongFunc(kv.value, 0)
+         val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version))
+         (dummyProps, 8)
+       } else {
+         bytesToKeyValues(kv.value, 0, kv.value.length, version)
+       }
+
+       val _mergedProps = (idxProps ++ props).toMap
+       val mergedProps =
+         if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
+         else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version))
+
+       val ts = kv.timestamp
+       IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps)
+
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
new file mode 100644
index 0000000..46ad15f
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
@@ -0,0 +1,53 @@
+package com.kakao.s2graph.core.storage.serde.indexedge.tall
+
+import com.kakao.s2graph.core.mysqls.LabelMeta
+import com.kakao.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable}
+import com.kakao.s2graph.core.types.VertexId
+import com.kakao.s2graph.core.{GraphUtil, IndexEdge}
+import org.apache.hadoop.hbase.util.Bytes
+
+
+class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] {
+   import StorageSerializable._
+
+   val label = indexEdge.label
+   val table = label.hbaseTableName.getBytes()
+   val cf = Serializable.edgeCf
+
+   val idxPropsMap = indexEdge.orders.toMap
+   val idxPropsBytes = propsToBytes(indexEdge.orders)
+
+   override def toKeyValues: Seq[SKeyValue] = {
+     val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
+     val labelWithDirBytes = indexEdge.labelWithDir.bytes
+     val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
+
+     val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
+     //    logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}")
+
+     val qualifier =
+       if (indexEdge.degreeEdge) Array.empty[Byte]
+       else
+         idxPropsMap.get(LabelMeta.toSeq) match {
+           case None => Bytes.add(idxPropsBytes, VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes)
+           case Some(vId) => idxPropsBytes
+         }
+
+     /** TODO search usage of op byte. if there is no, then remove opByte */
+     val rowBytes = Bytes.add(row, Array.fill(1)(GraphUtil.defaultOpByte), qualifier)
+     //    val qualifierBytes = Array.fill(1)(indexEdge.op)
+     val qualifierBytes = Array.empty[Byte]
+
+     val value =
+       if (indexEdge.degreeEdge)
+         Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong)
+       else if (indexEdge.op == GraphUtil.operations("incrementCount"))
+         Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong)
+       else propsToKeyValues(indexEdge.metas.toSeq)
+
+     val kv = SKeyValue(table, rowBytes, cf, qualifierBytes, value, indexEdge.version)
+
+     //        logger.debug(s"[Ser]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}")
+     Seq(kv)
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
new file mode 100644
index 0000000..f83dd1f
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -0,0 +1,116 @@
+package com.kakao.s2graph.core.storage.serde.indexedge.wide
+
+import com.kakao.s2graph.core.mysqls.LabelMeta
+import com.kakao.s2graph.core.storage.StorageDeserializable._
+import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, StorageDeserializable}
+import com.kakao.s2graph.core.types._
+import com.kakao.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex}
+
+class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] {
+   import StorageDeserializable._
+
+   type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int)
+   type ValueRaw = (Array[(Byte, InnerValLike)], Int)
+
+   private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = {
+     //    val degree = Bytes.toLong(kv.value)
+     val degree = bytesToLongFunc(kv.value, 0)
+     val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version))
+     val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version))
+     (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
+   }
+
+   private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = {
+     var qualifierLen = 0
+     var pos = 0
+     val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = {
+       val (props, endAt) = bytesToProps(kv.qualifier, pos, version)
+       pos = endAt
+       qualifierLen += endAt
+       val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) {
+         (HBaseType.defaultTgtVertexId, 0)
+       } else {
+         TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version)
+       }
+       qualifierLen += tgtVertexIdLen
+       (props, endAt, tgtVertexId, tgtVertexIdLen)
+     }
+     val (op, opLen) =
+       if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0)
+       else (kv.qualifier(qualifierLen), 1)
+
+     qualifierLen += opLen
+
+     (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen)
+   }
+
+   private def parseValue(kv: SKeyValue, version: String): ValueRaw = {
+     val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version)
+     (props, endAt)
+   }
+
+   private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = {
+     (Array.empty[(Byte, InnerValLike)], 0)
+   }
+
+   override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
+                                                    _kvs: Seq[T],
+                                                    version: String,
+                                                    cacheElementOpt: Option[IndexEdge]): IndexEdge = {
+     assert(_kvs.size == 1)
+
+     val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+
+     val kv = kvs.head
+     val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e =>
+       (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0)
+     }.getOrElse(parseRow(kv, version))
+
+     val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) =
+       if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, version)
+       else parseQualifier(kv, version)
+
+     val (props, _) = if (op == GraphUtil.operations("incrementCount")) {
+       //      val countVal = Bytes.toLong(kv.value)
+       val countVal = bytesToLongFunc(kv.value, 0)
+       val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version))
+       (dummyProps, 8)
+     } else if (kv.qualifier.isEmpty) {
+       parseDegreeValue(kv, version)
+     } else {
+       parseValue(kv, version)
+     }
+
+     val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
+
+
+     //    assert(kv.qualifier.nonEmpty && index.metaSeqs.size == idxPropsRaw.size)
+
+     val idxProps = for {
+       (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
+     } yield {
+         if (k == LabelMeta.degreeSeq) k -> v
+         else seq -> v
+       }
+
+     val idxPropsMap = idxProps.toMap
+     val tgtVertexId = if (tgtVertexIdInQualifier) {
+       idxPropsMap.get(LabelMeta.toSeq) match {
+         case None => tgtVertexIdRaw
+         case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId)
+       }
+     } else tgtVertexIdRaw
+
+     val _mergedProps = (idxProps ++ props).toMap
+     val mergedProps =
+       if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
+       else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version))
+
+     //    logger.error(s"$mergedProps")
+     //    val ts = mergedProps(LabelMeta.timeStampSeq).toString().toLong
+
+     val ts = kv.timestamp
+     IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps)
+
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
new file mode 100644
index 0000000..716b6fb
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
@@ -0,0 +1,53 @@
+package com.kakao.s2graph.core.storage.serde.indexedge.wide
+
+import com.kakao.s2graph.core.mysqls.LabelMeta
+import com.kakao.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable}
+import com.kakao.s2graph.core.types.VertexId
+import com.kakao.s2graph.core.{GraphUtil, IndexEdge}
+import org.apache.hadoop.hbase.util.Bytes
+
+
+class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] {
+   import StorageSerializable._
+
+   val label = indexEdge.label
+   val table = label.hbaseTableName.getBytes()
+   val cf = Serializable.edgeCf
+
+   val idxPropsMap = indexEdge.orders.toMap
+   val idxPropsBytes = propsToBytes(indexEdge.orders)
+
+   override def toKeyValues: Seq[SKeyValue] = {
+     val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
+     val labelWithDirBytes = indexEdge.labelWithDir.bytes
+     val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
+
+     val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
+     //    logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}")
+     val tgtIdBytes = VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes
+     val qualifier =
+       if (indexEdge.degreeEdge) Array.empty[Byte]
+       else {
+         if (indexEdge.op == GraphUtil.operations("incrementCount")) {
+           Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op))
+         } else {
+           idxPropsMap.get(LabelMeta.toSeq) match {
+             case None => Bytes.add(idxPropsBytes, tgtIdBytes)
+             case Some(vId) => idxPropsBytes
+           }
+         }
+       }
+
+
+     val value =
+       if (indexEdge.degreeEdge)
+         Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong)
+       else if (indexEdge.op == GraphUtil.operations("incrementCount"))
+         Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong)
+       else propsToKeyValues(indexEdge.metas.toSeq)
+
+     val kv = SKeyValue(table, row, cf, qualifier, value, indexEdge.version)
+
+     Seq(kv)
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
new file mode 100644
index 0000000..c97bed6
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
@@ -0,0 +1,84 @@
+package com.kakao.s2graph.core.storage.serde.snapshotedge.tall
+
+import com.kakao.s2graph.core.mysqls.{LabelIndex, LabelMeta}
+import com.kakao.s2graph.core.storage.StorageDeserializable._
+import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue}
+import com.kakao.s2graph.core.types.{HBaseType, LabelWithDirection, SourceAndTargetVertexIdPair, SourceVertexId}
+import com.kakao.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex}
+import org.apache.hadoop.hbase.util.Bytes
+
+class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
+
+  def statusCodeWithOp(byte: Byte): (Byte, Byte) = {
+    val statusCode = byte >> 4
+    val op = byte & ((1 << 4) - 1)
+    (statusCode.toByte, op.toByte)
+  }
+
+  override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
+                                                   _kvs: Seq[T],
+                                                   version: String,
+                                                   cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = {
+    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+    assert(kvs.size == 1)
+
+    val kv = kvs.head
+    val schemaVer = queryParam.label.schemaVersion
+    val cellVersion = kv.timestamp
+    /** rowKey */
+    def parseRowV3(kv: SKeyValue, version: String) = {
+      var pos = 0
+      val (srcIdAndTgtId, srcIdAndTgtIdLen) = SourceAndTargetVertexIdPair.fromBytes(kv.row, pos, kv.row.length, version)
+      pos += srcIdAndTgtIdLen
+      val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
+      pos += 4
+      val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
+
+      val rowLen = srcIdAndTgtIdLen + 4 + 1
+      (srcIdAndTgtId.srcInnerId, srcIdAndTgtId.tgtInnerId, labelWithDir, labelIdxSeq, isInverted, rowLen)
+
+    }
+    val (srcInnerId, tgtInnerId, labelWithDir, _, _, _) = cacheElementOpt.map { e =>
+      (e.srcVertex.innerId, e.tgtVertex.innerId, e.labelWithDir, LabelIndex.DefaultSeq, true, 0)
+    }.getOrElse(parseRowV3(kv, schemaVer))
+
+    val srcVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, srcInnerId)
+    val tgtVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, tgtInnerId)
+
+    val (props, op, ts, statusCode, _pendingEdgeOpt) = {
+      var pos = 0
+      val (statusCode, op) = statusCodeWithOp(kv.value(pos))
+      pos += 1
+      val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
+      val kvsMap = props.toMap
+      val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong
+
+      pos = endAt
+      val _pendingEdgeOpt =
+        if (pos == kv.value.length) None
+        else {
+          val (pendingEdgeStatusCode, pendingEdgeOp) = statusCodeWithOp(kv.value(pos))
+          pos += 1
+          //          val versionNum = Bytes.toLong(kv.value, pos, 8)
+          //          pos += 8
+          val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
+          pos = endAt
+          val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
+
+          val pendingEdge =
+            Edge(Vertex(srcVertexId, cellVersion),
+              Vertex(tgtVertexId, cellVersion),
+              labelWithDir, pendingEdgeOp,
+              cellVersion, pendingEdgeProps.toMap,
+              statusCode = pendingEdgeStatusCode, lockTs = lockTs)
+          Option(pendingEdge)
+        }
+
+      (kvsMap, op, ts, statusCode, _pendingEdgeOpt)
+    }
+
+    SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts),
+      labelWithDir, op, cellVersion, props, statusCode = statusCode,
+      pendingEdgeOpt = _pendingEdgeOpt, lockTs = None)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
new file mode 100644
index 0000000..a507b90
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
@@ -0,0 +1,47 @@
+package com.kakao.s2graph.core.storage.serde.snapshotedge.tall
+
+import com.kakao.s2graph.core.SnapshotEdge
+import com.kakao.s2graph.core.mysqls.LabelIndex
+import com.kakao.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable}
+import com.kakao.s2graph.core.types.SourceAndTargetVertexIdPair
+import org.apache.hadoop.hbase.util.Bytes
+
+
+class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] {
+  import StorageSerializable._
+
+  val label = snapshotEdge.label
+  val table = label.hbaseTableName.getBytes()
+  val cf = Serializable.edgeCf
+
+  def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = {
+    val byte = (((statusCode << 4) | op).toByte)
+    Array.fill(1)(byte.toByte)
+  }
+  def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op),
+    propsToKeyValuesWithTs(snapshotEdge.props.toList))
+
+  override def toKeyValues: Seq[SKeyValue] = {
+    val srcIdAndTgtIdBytes = SourceAndTargetVertexIdPair(snapshotEdge.srcVertex.innerId, snapshotEdge.tgtVertex.innerId).bytes
+    val labelWithDirBytes = snapshotEdge.labelWithDir.bytes
+    val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true)
+
+    val row = Bytes.add(srcIdAndTgtIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
+
+    val qualifier = Array.empty[Byte]
+
+    val value = snapshotEdge.pendingEdgeOpt match {
+      case None => valueBytes()
+      case Some(pendingEdge) =>
+        val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op)
+        val versionBytes = Array.empty[Byte]
+        val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq)
+        val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get)
+
+        Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes))
+    }
+
+    val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version)
+    Seq(kv)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
new file mode 100644
index 0000000..1174f50
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
@@ -0,0 +1,70 @@
+package com.kakao.s2graph.core.storage.serde.snapshotedge.wide
+
+import com.kakao.s2graph.core.mysqls.{LabelIndex, LabelMeta}
+import com.kakao.s2graph.core.storage.StorageDeserializable._
+import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable}
+import com.kakao.s2graph.core.types.TargetVertexId
+import com.kakao.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex}
+import org.apache.hadoop.hbase.util.Bytes
+
+class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
+
+  def statusCodeWithOp(byte: Byte): (Byte, Byte) = {
+    val statusCode = byte >> 4
+    val op = byte & ((1 << 4) - 1)
+    (statusCode.toByte, op.toByte)
+  }
+
+  override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
+                                                   _kvs: Seq[T],
+                                                   version: String,
+                                                   cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = {
+    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+    assert(kvs.size == 1)
+
+    val kv = kvs.head
+    val schemaVer = queryParam.label.schemaVersion
+    val cellVersion = kv.timestamp
+
+    val (srcVertexId, labelWithDir, _, _, _) = cacheElementOpt.map { e =>
+      (e.srcVertex.id, e.labelWithDir, LabelIndex.DefaultSeq, true, 0)
+    }.getOrElse(parseRow(kv, schemaVer))
+
+    val (tgtVertexId, props, op, ts, statusCode, _pendingEdgeOpt) = {
+      val (tgtVertexId, _) = TargetVertexId.fromBytes(kv.qualifier, 0, kv.qualifier.length, schemaVer)
+      var pos = 0
+      val (statusCode, op) = statusCodeWithOp(kv.value(pos))
+      pos += 1
+      val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
+      val kvsMap = props.toMap
+      val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong
+
+      pos = endAt
+      val _pendingEdgeOpt =
+        if (pos == kv.value.length) None
+        else {
+          val (pendingEdgeStatusCode, pendingEdgeOp) = statusCodeWithOp(kv.value(pos))
+          pos += 1
+          //          val versionNum = Bytes.toLong(kv.value, pos, 8)
+          //          pos += 8
+          val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
+          pos = endAt
+          val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
+
+          val pendingEdge =
+            Edge(Vertex(srcVertexId, cellVersion),
+              Vertex(tgtVertexId, cellVersion),
+              labelWithDir, pendingEdgeOp,
+              cellVersion, pendingEdgeProps.toMap,
+              statusCode = pendingEdgeStatusCode, lockTs = lockTs)
+          Option(pendingEdge)
+        }
+
+      (tgtVertexId, kvsMap, op, ts, statusCode, _pendingEdgeOpt)
+    }
+
+    SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts),
+      labelWithDir, op, cellVersion, props, statusCode = statusCode,
+      pendingEdgeOpt = _pendingEdgeOpt, lockTs = None)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
new file mode 100644
index 0000000..e6074d9
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
@@ -0,0 +1,50 @@
+package com.kakao.s2graph.core.storage.serde.snapshotedge.wide
+
+import com.kakao.s2graph.core.SnapshotEdge
+import com.kakao.s2graph.core.mysqls.LabelIndex
+import com.kakao.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable}
+import com.kakao.s2graph.core.types.VertexId
+import org.apache.hadoop.hbase.util.Bytes
+
+
+/**
+ * this class serialize
+ * @param snapshotEdge
+ */
+class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] {
+  import StorageSerializable._
+
+  val label = snapshotEdge.label
+  val table = label.hbaseTableName.getBytes()
+  val cf = Serializable.edgeCf
+
+  def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = {
+    val byte = (((statusCode << 4) | op).toByte)
+    Array.fill(1)(byte.toByte)
+  }
+  def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op),
+    propsToKeyValuesWithTs(snapshotEdge.props.toList))
+
+  override def toKeyValues: Seq[SKeyValue] = {
+    val srcIdBytes = VertexId.toSourceVertexId(snapshotEdge.srcVertex.id).bytes
+    val labelWithDirBytes = snapshotEdge.labelWithDir.bytes
+    val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true)
+
+    val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
+    val tgtIdBytes = VertexId.toTargetVertexId(snapshotEdge.tgtVertex.id).bytes
+
+    val qualifier = tgtIdBytes
+
+    val value = snapshotEdge.pendingEdgeOpt match {
+      case None => valueBytes()
+      case Some(pendingEdge) =>
+        val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op)
+        val versionBytes = Array.empty[Byte]
+        val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq)
+        val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get)
+        Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes))
+    }
+    val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version)
+    Seq(kv)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
new file mode 100644
index 0000000..e355401
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
@@ -0,0 +1,46 @@
+package com.kakao.s2graph.core.storage.serde.vertex
+
+import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable}
+import com.kakao.s2graph.core.types.{InnerVal, InnerValLike, VertexId}
+import com.kakao.s2graph.core.{QueryParam, Vertex}
+import org.apache.hadoop.hbase.util.Bytes
+import scala.collection.mutable.ListBuffer
+
+class VertexDeserializable extends Deserializable[Vertex] {
+  def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
+                                     _kvs: Seq[T],
+                                     version: String,
+                                     cacheElementOpt: Option[Vertex]): Vertex = {
+
+    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+
+    val kv = kvs.head
+    val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version)
+
+    var maxTs = Long.MinValue
+    val propsMap = new collection.mutable.HashMap[Int, InnerValLike]
+    val belongLabelIds = new ListBuffer[Int]
+
+    for {
+      kv <- kvs
+    } {
+      val propKey =
+        if (kv.qualifier.length == 1) kv.qualifier.head.toInt
+        else Bytes.toInt(kv.qualifier)
+
+      val ts = kv.timestamp
+      if (ts > maxTs) maxTs = ts
+
+      if (Vertex.isLabelId(propKey)) {
+        belongLabelIds += Vertex.toLabelId(propKey)
+      } else {
+        val v = kv.value
+        val (value, _) = InnerVal.fromBytes(v, 0, v.length, version)
+        propsMap += (propKey -> value)
+      }
+    }
+    assert(maxTs != Long.MinValue)
+    Vertex(vertexId, maxTs, propsMap.toMap, belongLabelIds = belongLabelIds)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexSerializable.scala
new file mode 100644
index 0000000..0c17592
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexSerializable.scala
@@ -0,0 +1,20 @@
+package com.kakao.s2graph.core.storage.serde.vertex
+
+import com.kakao.s2graph.core.Vertex
+import com.kakao.s2graph.core.storage.{SKeyValue, Serializable}
+import org.apache.hadoop.hbase.util.Bytes
+
+
+case class VertexSerializable(vertex: Vertex) extends Serializable[Vertex] {
+
+  val cf = Serializable.vertexCf
+
+  override def toKeyValues: Seq[SKeyValue] = {
+    val row = vertex.id.bytes
+    val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield Bytes.toBytes(k) -> v.bytes
+    val belongsTo = vertex.belongLabelIds.map { labelId => Bytes.toBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] }
+    (base ++ belongsTo).map { case (qualifier, value) =>
+      SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, vertex.ts)
+    } toSeq
+  }
+}