You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/11/16 16:33:36 UTC

[3/7] incubator-s2graph git commit: [S2GRAPH-122]: Change data types of Edge/IndexEdge/SnapshotEdge.

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
index 69926fa..d2a7de7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
@@ -21,6 +21,7 @@ package org.apache.s2graph.core.storage
 
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core.QueryParam
+import org.apache.s2graph.core.mysqls.{LabelMeta, Label}
 import org.apache.s2graph.core.types.{HBaseType, InnerVal, InnerValLike, InnerValLikeWithTs}
 import org.apache.s2graph.core.utils.logger
 
@@ -36,16 +37,17 @@ object StorageDeserializable {
   def bytesToKeyValues(bytes: Array[Byte],
                        offset: Int,
                        length: Int,
-                       version: String): (Array[(Byte, InnerValLike)], Int) = {
+                       schemaVer: String,
+                       label: Label): (Array[(LabelMeta, InnerValLike)], Int) = {
     var pos = offset
     val len = bytes(pos)
     pos += 1
-    val kvs = new Array[(Byte, InnerValLike)](len)
+    val kvs = new Array[(LabelMeta, InnerValLike)](len)
     var i = 0
     while (i < len) {
-      val k = bytes(pos)
+      val k = label.labelMetaMap(bytes(pos))
       pos += 1
-      val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, version)
+      val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, schemaVer)
       pos += numOfBytesUsed
       kvs(i) = (k -> v)
       i += 1
@@ -57,16 +59,17 @@ object StorageDeserializable {
 
   def bytesToKeyValuesWithTs(bytes: Array[Byte],
                              offset: Int,
-                             version: String): (Array[(Byte, InnerValLikeWithTs)], Int) = {
+                             schemaVer: String,
+                             label: Label): (Array[(LabelMeta, InnerValLikeWithTs)], Int) = {
     var pos = offset
     val len = bytes(pos)
     pos += 1
-    val kvs = new Array[(Byte, InnerValLikeWithTs)](len)
+    val kvs = new Array[(LabelMeta, InnerValLikeWithTs)](len)
     var i = 0
     while (i < len) {
-      val k = bytes(pos)
+      val k = label.labelMetaMap(bytes(pos))
       pos += 1
-      val (v, numOfBytesUsed) = InnerValLikeWithTs.fromBytes(bytes, pos, 0, version)
+      val (v, numOfBytesUsed) = InnerValLikeWithTs.fromBytes(bytes, pos, 0, schemaVer)
       pos += numOfBytesUsed
       kvs(i) = (k -> v)
       i += 1
@@ -78,15 +81,15 @@ object StorageDeserializable {
 
   def bytesToProps(bytes: Array[Byte],
                    offset: Int,
-                   version: String): (Array[(Byte, InnerValLike)], Int) = {
+                   schemaVer: String): (Array[(LabelMeta, InnerValLike)], Int) = {
     var pos = offset
     val len = bytes(pos)
     pos += 1
-    val kvs = new Array[(Byte, InnerValLike)](len)
+    val kvs = new Array[(LabelMeta, InnerValLike)](len)
     var i = 0
     while (i < len) {
-      val k = HBaseType.EMPTY_SEQ_BYTE
-      val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, version)
+      val k = LabelMeta.empty
+      val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, schemaVer)
       pos += numOfBytesUsed
       kvs(i) = (k -> v)
       i += 1
@@ -98,17 +101,19 @@ object StorageDeserializable {
   }
 
   def bytesToLong(bytes: Array[Byte], offset: Int): Long = Bytes.toLong(bytes, offset)
+
+  def bytesToInt(bytes: Array[Byte], offset: Int): Int = Bytes.toInt(bytes, offset)
 }
 
 trait StorageDeserializable[E] {
-  def fromKeyValues[T: CanSKeyValue](queryParam: QueryParam, kvs: Seq[T], version: String, cacheElementOpt: Option[E]): Option[E] = {
+  def fromKeyValues[T: CanSKeyValue](checkLabel: Option[Label], kvs: Seq[T], version: String, cacheElementOpt: Option[E]): Option[E] = {
     try {
-      Option(fromKeyValuesInner(queryParam, kvs, version, cacheElementOpt))
+      Option(fromKeyValuesInner(checkLabel, kvs, version, cacheElementOpt))
     } catch {
       case e: Exception =>
         logger.error(s"${this.getClass.getName} fromKeyValues failed.", e)
         None
     }
   }
-  def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, kvs: Seq[T], version: String, cacheElementOpt: Option[E]): E
+  def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label], kvs: Seq[T], version: String, cacheElementOpt: Option[E]): E
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala
index b7326f5..c1efe7b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala
@@ -20,31 +20,33 @@
 package org.apache.s2graph.core.storage
 
 import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.mysqls.LabelMeta
 import org.apache.s2graph.core.types.{InnerValLike, InnerValLikeWithTs}
+import org.apache.s2graph.core.utils.logger
 
 object StorageSerializable {
   /** serializer */
-  def propsToBytes(props: Seq[(Byte, InnerValLike)]): Array[Byte] = {
+  def propsToBytes(props: Seq[(LabelMeta, InnerValLike)]): Array[Byte] = {
     val len = props.length
     assert(len < Byte.MaxValue)
     var bytes = Array.fill(1)(len.toByte)
-    for ((k, v) <- props) bytes = Bytes.add(bytes, v.bytes)
+    for ((_, v) <- props) bytes = Bytes.add(bytes, v.bytes)
     bytes
   }
 
-  def propsToKeyValues(props: Seq[(Byte, InnerValLike)]): Array[Byte] = {
+  def propsToKeyValues(props: Seq[(LabelMeta, InnerValLike)]): Array[Byte] = {
     val len = props.length
     assert(len < Byte.MaxValue)
     var bytes = Array.fill(1)(len.toByte)
-    for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k), v.bytes)
+    for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k.seq), v.bytes)
     bytes
   }
 
-  def propsToKeyValuesWithTs(props: Seq[(Byte, InnerValLikeWithTs)]): Array[Byte] = {
+  def propsToKeyValuesWithTs(props: Seq[(LabelMeta, InnerValLikeWithTs)]): Array[Byte] = {
     val len = props.length
     assert(len < Byte.MaxValue)
     var bytes = Array.fill(1)(len.toByte)
-    for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k), v.bytes)
+    for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k.seq), v.bytes)
     bytes
   }
 
@@ -53,13 +55,17 @@ object StorageSerializable {
     val byte = labelOrderSeq << 1 | (if (isInverted) 1 else 0)
     Array.fill(1)(byte.toByte)
   }
+
+  def intToBytes(value: Int): Array[Byte] = Bytes.toBytes(value)
+
+  def longToBytes(value: Long): Array[Byte] = Bytes.toBytes(value)
 }
 
 trait StorageSerializable[E] {
   val cf = Serializable.edgeCf
 
-  val table: Array[Byte]
-  val ts: Long
+  def table: Array[Byte]
+  def ts: Long
 
   def toRowKey: Array[Byte]
   def toQualifier: Array[Byte]
@@ -70,7 +76,7 @@ trait StorageSerializable[E] {
     val qualifier = toQualifier
     val value = toValue
     val kv = SKeyValue(table, row, cf, qualifier, value, ts)
-
+//    logger.debug(s"[SER]: ${kv.toLogString}}")
     Seq(kv)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index b52ba53..e63dfea 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -19,11 +19,13 @@
 
 package org.apache.s2graph.core.storage.hbase
 
+
+
 import java.util
 import java.util.Base64
 
-import com.stumbleupon.async.Deferred
-import com.typesafe.config.{Config, ConfigFactory}
+import com.stumbleupon.async.{Callback, Deferred}
+import com.typesafe.config.Config
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hbase.client.{ConnectionFactory, Durability}
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
@@ -35,15 +37,17 @@ import org.apache.hadoop.security.UserGroupInformation
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls.LabelMeta
 import org.apache.s2graph.core.storage._
+import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.{AsyncRPC, ScanWithRange}
 import org.apache.s2graph.core.types.{HBaseType, VertexId}
 import org.apache.s2graph.core.utils._
 import org.hbase.async.FilterList.Operator.MUST_PASS_ALL
 import org.hbase.async._
 
 import scala.collection.JavaConversions._
-import scala.collection.{Map, Seq}
-import scala.concurrent.duration.Duration
+import scala.collection.mutable.ArrayBuffer
 import scala.concurrent._
+import scala.concurrent.duration.Duration
+import scala.util.Try
 import scala.util.hashing.MurmurHash3
 
 
@@ -79,12 +83,15 @@ object AsynchbaseStorage {
     logger.info(s"Asynchbase: ${client.getConfig.dumpConfiguration()}")
     client
   }
+
+  case class ScanWithRange(scan: Scanner, offset: Int, limit: Int)
+  type AsyncRPC = Either[GetRequest, ScanWithRange]
 }
 
 
 class AsynchbaseStorage(override val graph: Graph,
                         override val config: Config)(implicit ec: ExecutionContext)
-  extends Storage[Deferred[StepInnerResult]](graph, config) {
+  extends Storage[AsyncRPC, Deferred[StepResult]](graph, config) {
 
   import Extensions.DeferOps
 
@@ -92,26 +99,41 @@ class AsynchbaseStorage(override val graph: Graph,
    * Asynchbase client setup.
    * note that we need two client, one for bulk(withWait=false) and another for withWait=true
    */
-  val configWithFlush = config.withFallback(ConfigFactory.parseMap(Map("hbase.rpcs.buffered_flush_interval" -> "0")))
-  val client = AsynchbaseStorage.makeClient(config)
-
-  private val clientWithFlush = AsynchbaseStorage.makeClient(config, "hbase.rpcs.buffered_flush_interval" -> "0")
-  private val clients = Seq(client, clientWithFlush)
   private val clientFlushInterval = config.getInt("hbase.rpcs.buffered_flush_interval").toString().toShort
+
+  /**
+   * since some runtime environment such as spark cluster has issue with guava version, that is used in Asynchbase.
+   * to fix version conflict, make this as lazy val for clients that don't require hbase client.
+   */
+  lazy val client = AsynchbaseStorage.makeClient(config)
+  lazy val clientWithFlush = AsynchbaseStorage.makeClient(config, "hbase.rpcs.buffered_flush_interval" -> "0")
+  lazy val clients = Seq(client, clientWithFlush)
+
   private val emptyKeyValues = new util.ArrayList[KeyValue]()
+  private val emptyStepResult = new util.ArrayList[StepResult]()
+
   private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client
 
+  import CanDefer._
+
   /** Future Cache to squash request */
-  private val futureCache = new DeferCache[StepInnerResult, Deferred, Deferred](config, StepInnerResult.Empty, "FutureCache", useMetric = true)
+  lazy private val futureCache = new DeferCache[StepResult, Deferred, Deferred](config, StepResult.Empty, "AsyncHbaseFutureCache", useMetric = true)
 
   /** Simple Vertex Cache */
-  private val vertexCache = new DeferCache[Seq[SKeyValue], Promise, Future](config, Seq.empty[SKeyValue])
+  lazy private val vertexCache = new DeferCache[Seq[SKeyValue], Promise, Future](config, Seq.empty[SKeyValue])
 
   private val zkQuorum = config.getString("hbase.zookeeper.quorum")
   private val zkQuorumSlave =
-    if (config.hasPath("hbase.zookeeper.quorum")) Option(config.getString("hbase.zookeeper.quorum"))
+    if (config.hasPath("hbase.slave.zookeeper.quorum")) Option(config.getString("hbase.slave.zookeeper.quorum"))
     else None
 
+  /** v4 max next row size */
+  private val v4_max_num_rows = 10000
+  private def getV4MaxNumRows(limit : Int): Int = {
+    if (limit < v4_max_num_rows) limit
+    else v4_max_num_rows
+  }
+
   /**
    * fire rpcs into proper hbase cluster using client and
    * return true on all mutation success. otherwise return false.
@@ -120,37 +142,75 @@ class AsynchbaseStorage(override val graph: Graph,
     if (kvs.isEmpty) Future.successful(true)
     else {
       val _client = client(withWait)
-      val futures = kvs.map { kv =>
-        val _defer = kv.operation match {
-          case SKeyValue.Put => _client.put(new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp))
-          case SKeyValue.Delete =>
-            if (kv.qualifier == null) _client.delete(new DeleteRequest(kv.table, kv.row, kv.cf, kv.timestamp))
-            else _client.delete(new DeleteRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.timestamp))
-          case SKeyValue.Increment =>
-            _client.atomicIncrement(new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value)))
-        }
-        val future = _defer.withCallback { ret => true }.recoverWith { ex =>
+      val (increments, putAndDeletes) = kvs.partition(_.operation == SKeyValue.Increment)
+
+      /** Asynchbase IncrementRequest does not implement HasQualifiers */
+      val incrementsFutures = increments.map { kv =>
+        val inc = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value))
+        val defer = _client.atomicIncrement(inc)
+        val future = defer.toFuture(Long.box(0)).map(_ => true).recover { case ex: Exception =>
           logger.error(s"mutation failed. $kv", ex)
           false
-        }.toFuture
-
+        }
         if (withWait) future else Future.successful(true)
       }
 
-      Future.sequence(futures).map(_.forall(identity))
+      /** PutRequest and DeleteRequest accept byte[][] qualifiers/values. */
+      val othersFutures = putAndDeletes.groupBy { kv =>
+        (kv.table.toSeq, kv.row.toSeq, kv.cf.toSeq, kv.operation, kv.timestamp)
+      }.map { case ((table, row, cf, operation, timestamp), groupedKeyValues) =>
+
+        val durability = groupedKeyValues.head.durability
+        val qualifiers = new ArrayBuffer[Array[Byte]]()
+        val values = new ArrayBuffer[Array[Byte]]()
+
+        groupedKeyValues.foreach { kv =>
+          if (kv.qualifier != null) qualifiers += kv.qualifier
+          if (kv.value != null) values += kv.value
+        }
+        val defer = operation match {
+          case SKeyValue.Put =>
+            val put = new PutRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, values.toArray, timestamp)
+            put.setDurable(durability)
+            _client.put(put)
+          case SKeyValue.Delete =>
+            val delete =
+              if (qualifiers.isEmpty)
+                new DeleteRequest(table.toArray, row.toArray, cf.toArray, timestamp)
+              else
+                new DeleteRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, timestamp)
+            delete.setDurable(durability)
+            _client.delete(delete)
+        }
+        if (withWait) {
+          defer.toFuture(new AnyRef()).map(_ => true).recover { case ex: Exception =>
+            groupedKeyValues.foreach { kv => logger.error(s"mutation failed. $kv", ex) }
+            false
+          }
+        } else Future.successful(true)
+      }
+      for {
+        incrementRets <- Future.sequence(incrementsFutures)
+        otherRets <- Future.sequence(othersFutures)
+      } yield (incrementRets ++ otherRets).forall(identity)
     }
   }
 
-
-  override def fetchSnapshotEdgeKeyValues(hbaseRpc: AnyRef): Future[Seq[SKeyValue]] = {
-    val defer = fetchKeyValuesInner(hbaseRpc)
-    defer.toFuture.map { kvsArr =>
+  private def fetchKeyValues(rpc: AsyncRPC): Future[Seq[SKeyValue]] = {
+    val defer = fetchKeyValuesInner(rpc)
+    defer.toFuture(emptyKeyValues).map { kvsArr =>
       kvsArr.map { kv =>
         implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv)
-      } toSeq
+      }
     }
   }
 
+  override def fetchSnapshotEdgeKeyValues(queryRequest: QueryRequest): Future[Seq[SKeyValue]] = {
+    val edge = toRequestEdge(queryRequest, Nil)
+    val rpc = buildRequest(queryRequest, edge)
+    fetchKeyValues(rpc)
+  }
+
   /**
    * since HBase natively provide CheckAndSet on storage level, implementation becomes simple.
    * @param rpc: key value that is need to be stored on storage.
@@ -162,7 +222,7 @@ class AsynchbaseStorage(override val graph: Graph,
   override def writeLock(rpc: SKeyValue, expectedOpt: Option[SKeyValue]): Future[Boolean] = {
     val put = new PutRequest(rpc.table, rpc.row, rpc.cf, rpc.qualifier, rpc.value, rpc.timestamp)
     val expected = expectedOpt.map(_.value).getOrElse(Array.empty)
-    client(withWait = true).compareAndSet(put, expected).withCallback(ret => ret.booleanValue()).toFuture
+    client(withWait = true).compareAndSet(put, expected).map(true.booleanValue())(ret => ret.booleanValue()).toFuture(true)
   }
 
 
@@ -182,24 +242,24 @@ class AsynchbaseStorage(override val graph: Graph,
    * @param queryRequest
    * @return Scanner or GetRequest with proper setup with StartKey, EndKey, RangeFilter.
    */
-  override def buildRequest(queryRequest: QueryRequest): AnyRef = {
+  override def buildRequest(queryRequest: QueryRequest, edge: Edge): AsyncRPC = {
     import Serializable._
     val queryParam = queryRequest.queryParam
     val label = queryParam.label
-    val edge = toRequestEdge(queryRequest)
 
     val serializer = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
       val snapshotEdge = edge.toSnapshotEdge
       snapshotEdgeSerializer(snapshotEdge)
     } else {
-      val indexEdge = IndexEdge(edge.srcVertex, edge.tgtVertex, edge.labelWithDir,
+      val indexEdge = IndexEdge(edge.srcVertex, edge.tgtVertex, edge.label, edge.dir,
         edge.op, edge.version, queryParam.labelOrderSeq, edge.propsWithTs)
       indexEdgeSerializer(indexEdge)
     }
 
-    val (rowKey, qualifier) = (serializer.toRowKey, serializer.toQualifier)
+    val rowKey = serializer.toRowKey
+    val (minTs, maxTs) = queryParam.durationOpt.getOrElse((0L, Long.MaxValue))
 
-    val (minTs, maxTs) = queryParam.duration.getOrElse((0L, Long.MaxValue))
+    val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge))
 
     label.schemaVersion match {
       case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty =>
@@ -215,28 +275,26 @@ class AsynchbaseStorage(override val graph: Graph,
         val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
         val labelWithDirBytes = indexEdge.labelWithDir.bytes
         val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
-        //        val labelIndexSeqWithIsInvertedStopBytes =  StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = true)
-        val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, Bytes.add(labelIndexSeqWithIsInvertedBytes, Array.fill(1)(edge.op)))
+        val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
+
         val (startKey, stopKey) =
-          if (queryParam.columnRangeFilter != null) {
+          if (queryParam.intervalOpt.isDefined) {
             // interval is set.
             val _startKey = queryParam.cursorOpt match {
-              case Some(cursor) => Bytes.add(Base64.getDecoder.decode(cursor), Array.fill(1)(0))
-              case None => Bytes.add(baseKey, queryParam.columnRangeFilterMinBytes)
+              case Some(cursor) => Base64.getDecoder.decode(cursor)
+              case None => Bytes.add(baseKey, intervalMaxBytes)
             }
-            (_startKey, Bytes.add(baseKey, queryParam.columnRangeFilterMaxBytes))
+            (_startKey , Bytes.add(baseKey, intervalMinBytes))
           } else {
-            /*
-             * note: since propsToBytes encode size of property map at first byte, we are sure about max value here
-             */
+            /**
+              * note: since propsToBytes encode size of property map at first byte, we are sure about max value here
+              */
             val _startKey = queryParam.cursorOpt match {
-              case Some(cursor) => Bytes.add(Base64.getDecoder.decode(cursor), Array.fill(1)(0))
+              case Some(cursor) => Base64.getDecoder.decode(cursor)
               case None => baseKey
             }
             (_startKey, Bytes.add(baseKey, Array.fill(1)(-1)))
           }
-//                logger.debug(s"[StartKey]: ${startKey.toList}")
-//                logger.debug(s"[StopKey]: ${stopKey.toList}")
 
         scanner.setStartKey(startKey)
         scanner.setStopKey(stopKey)
@@ -244,15 +302,23 @@ class AsynchbaseStorage(override val graph: Graph,
         if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: $queryParam")
 
         scanner.setMaxVersions(1)
-        scanner.setMaxNumRows(queryParam.offset + queryParam.limit)
+        // TODO: exclusive condition innerOffset with cursorOpt
+        if (queryParam.cursorOpt.isDefined) {
+          scanner.setMaxNumRows(getV4MaxNumRows(queryParam.limit))
+        } else {
+          scanner.setMaxNumRows(getV4MaxNumRows(queryParam.innerOffset + queryParam.innerLimit))
+        }
         scanner.setMaxTimestamp(maxTs)
         scanner.setMinTimestamp(minTs)
-        scanner.setRpcTimeout(queryParam.rpcTimeoutInMillis)
+        scanner.setRpcTimeout(queryParam.rpcTimeout)
+
         // SET option for this rpc properly.
-        scanner
+        if (queryParam.cursorOpt.isDefined) Right(ScanWithRange(scanner, 0, queryParam.limit))
+        else Right(ScanWithRange(scanner, 0, queryParam.innerOffset + queryParam.innerLimit))
+
       case _ =>
         val get = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
-          new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf, qualifier)
+          new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf, serializer.toQualifier)
         } else {
           new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf)
         }
@@ -261,12 +327,14 @@ class AsynchbaseStorage(override val graph: Graph,
         get.setFailfast(true)
         get.setMinTimestamp(minTs)
         get.setMaxTimestamp(maxTs)
-        get.setTimeout(queryParam.rpcTimeoutInMillis)
+        get.setTimeout(queryParam.rpcTimeout)
 
         val pagination = new ColumnPaginationFilter(queryParam.limit, queryParam.offset)
-        get.setFilter(new FilterList(pagination +: Option(queryParam.columnRangeFilter).toSeq, MUST_PASS_ALL))
-
-        get
+        val columnRangeFilterOpt = queryParam.intervalOpt.map { interval =>
+          new ColumnRangeFilter(intervalMaxBytes, true, intervalMinBytes, true)
+        }
+        get.setFilter(new FilterList(pagination +: columnRangeFilterOpt.toSeq, MUST_PASS_ALL))
+        Left(get)
     }
   }
 
@@ -274,81 +342,81 @@ class AsynchbaseStorage(override val graph: Graph,
    * we are using future cache to squash requests into same key on storage.
    *
    * @param queryRequest
-   * @param prevStepScore
    * @param isInnerCall
    * @param parentEdges
    * @return we use Deferred here since it has much better performrance compared to scala.concurrent.Future.
    *         seems like map, flatMap on scala.concurrent.Future is slower than Deferred's addCallback
    */
   override def fetch(queryRequest: QueryRequest,
-                     prevStepScore: Double,
                      isInnerCall: Boolean,
-                     parentEdges: Seq[EdgeWithScore]): Deferred[StepInnerResult] = {
+                     parentEdges: Seq[EdgeWithScore]): Deferred[StepResult] = {
 
-    def fetchInner(hbaseRpc: AnyRef): Deferred[StepInnerResult] = {
-      val queryParam = queryRequest.queryParam
+    def fetchInner(hbaseRpc: AsyncRPC): Deferred[StepResult] = {
+      val prevStepScore = queryRequest.prevStepScore
+      val fallbackFn: (Exception => StepResult) = { ex =>
+        logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex)
+        StepResult.Failure
+      }
 
-      fetchKeyValuesInner(hbaseRpc).withCallback { kvs =>
-        val (startOffset, length) = queryParam.label.schemaVersion match {
-          case HBaseType.VERSION4 => (queryParam.offset, queryParam.limit)
+      val queryParam = queryRequest.queryParam
+      fetchKeyValuesInner(hbaseRpc).mapWithFallback(emptyKeyValues)(fallbackFn) { kvs =>
+        val (startOffset, len) = queryParam.label.schemaVersion match {
+          case HBaseType.VERSION4 =>
+            val offset = if (queryParam.cursorOpt.isDefined) 0 else queryParam.offset
+            (offset, queryParam.limit)
           case _ => (0, kvs.length)
         }
 
-        val edgeWithScores = toEdges(kvs, queryParam, prevStepScore, isInnerCall, parentEdges, startOffset, length)
-        if (edgeWithScores.isEmpty) StepInnerResult.Empty
-        else {
-          val head = edgeWithScores.head
-          val (degreeEdges, indexEdges) =
-            if (head.edge.isDegree) (Seq(head), edgeWithScores.tail)
-            else (Nil, edgeWithScores)
-
-          val normalized =
-            if (queryRequest.queryParam.shouldNormalize) normalize(indexEdges)
-            else indexEdges
-
-          val sampled = if (queryRequest.queryParam.sample >= 0) {
-            sample(queryRequest, normalized, queryRequest.queryParam.sample)
-          } else normalized
-
-          StepInnerResult(edgesWithScoreLs = sampled, degreeEdges)
-        }
-      } recoverWith { ex =>
-        logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex)
-        StepInnerResult.Failure
+        toEdges(kvs, queryRequest, prevStepScore, isInnerCall, parentEdges, startOffset, len)
       }
     }
 
     val queryParam = queryRequest.queryParam
     val cacheTTL = queryParam.cacheTTLInMillis
-    val request = buildRequest(queryRequest)
+    /** with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */
+
+    val edge = toRequestEdge(queryRequest, parentEdges)
+    val request = buildRequest(queryRequest, edge)
+    val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge))
+    val requestCacheKey = Bytes.add(toCacheKeyBytes(request), intervalMaxBytes, intervalMinBytes)
 
     if (cacheTTL <= 0) fetchInner(request)
     else {
-      val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request))
+      val cacheKeyBytes = Bytes.add(queryRequest.query.queryOption.cacheKeyBytes, requestCacheKey)
+
+//      val cacheKeyBytes = toCacheKeyBytes(request)
       val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
       futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
     }
   }
 
+  override def fetches(queryRequests: Seq[QueryRequest],
+                       prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepResult]] = {
+    val defers: Seq[Deferred[StepResult]] = for {
+      queryRequest <- queryRequests
+    } yield {
+        val queryOption = queryRequest.query.queryOption
+        val queryParam = queryRequest.queryParam
+        val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent
+        val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil
+        fetch(queryRequest, isInnerCall = false, parentEdges)
+      }
 
-  override def fetches(queryRequestWithScoreLs: scala.Seq[(QueryRequest, Double)],
-                       prevStepEdges: Predef.Map[VertexId, scala.Seq[EdgeWithScore]]): Future[scala.Seq[StepInnerResult]] = {
-    val defers: Seq[Deferred[StepInnerResult]] = for {
-      (queryRequest, prevStepScore) <- queryRequestWithScoreLs
-      parentEdges <- prevStepEdges.get(queryRequest.vertex.id)
-    } yield fetch(queryRequest, prevStepScore, isInnerCall = false, parentEdges)
-
-    val grouped: Deferred[util.ArrayList[StepInnerResult]] = Deferred.group(defers)
-    grouped withCallback {
-      queryResults: util.ArrayList[StepInnerResult] =>
-        queryResults.toIndexedSeq
-    } toFuture
+    val grouped: Deferred[util.ArrayList[StepResult]] = Deferred.groupInOrder(defers)
+    grouped.map(emptyStepResult) { queryResults: util.ArrayList[StepResult] =>
+      queryResults.toSeq
+    }.toFuture(emptyStepResult)
   }
 
 
-  def fetchVertexKeyValues(request: AnyRef): Future[Seq[SKeyValue]] = fetchSnapshotEdgeKeyValues(request)
+  def fetchVertexKeyValues(request: QueryRequest): Future[Seq[SKeyValue]] = {
+    val edge = toRequestEdge(request, Nil)
+    fetchKeyValues(buildRequest(request, edge))
+  }
 
 
+  def fetchVertexKeyValues(request: AsyncRPC): Future[Seq[SKeyValue]] = fetchKeyValues(request)
+
   /**
    * when withWait is given, we use client with flushInterval set to 0.
    * if we are not using this, then we are adding extra wait time as much as flushInterval in worst case.
@@ -357,35 +425,44 @@ class AsynchbaseStorage(override val graph: Graph,
    * @param withWait
    * @return
    */
-  override def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long)]] = {
+  override def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]] = {
+
     val _client = client(withWait)
-    val defers: Seq[Deferred[(Boolean, Long)]] = for {
+    val defers: Seq[Deferred[(Boolean, Long, Long)]] = for {
       edge <- edges
     } yield {
-        val edgeWithIndex = edge.edgesWithIndex.head
-        val countWithTs = edge.propsWithTs(LabelMeta.countSeq)
-        val countVal = countWithTs.innerVal.toString().toLong
-        val kv = buildIncrementsCountAsync(edgeWithIndex, countVal).head
-        val request = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value))
-        val defer = _client.bufferAtomicIncrement(request) withCallback { resultCount: java.lang.Long =>
-          (true, resultCount.longValue())
-        } recoverWith { ex =>
-          logger.error(s"mutation failed. $request", ex)
-          (false, -1L)
+        val futures: List[Deferred[(Boolean, Long, Long)]] = for {
+          relEdge <- edge.relatedEdges
+          edgeWithIndex <- relEdge.edgesWithIndexValid
+        } yield {
+          val countWithTs = edge.propsWithTs(LabelMeta.count)
+          val countVal = countWithTs.innerVal.toString().toLong
+          val kv = buildIncrementsCountAsync(edgeWithIndex, countVal).head
+          val request = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value))
+          val fallbackFn: (Exception => (Boolean, Long, Long)) = { ex =>
+            logger.error(s"mutation failed. $request", ex)
+            (false, -1L, -1L)
+          }
+          val defer = _client.bufferAtomicIncrement(request).mapWithFallback(0L)(fallbackFn) { resultCount: java.lang.Long =>
+            (true, resultCount.longValue(), countVal)
+          }
+          if (withWait) defer
+          else Deferred.fromResult((true, -1L, -1L))
         }
-        if (withWait) defer
-        else Deferred.fromResult((true, -1L))
+
+        val grouped: Deferred[util.ArrayList[(Boolean, Long, Long)]] = Deferred.group(futures)
+        grouped.map(new util.ArrayList[(Boolean, Long, Long)]()) { resultLs => resultLs.head }
       }
 
-    val grouped: Deferred[util.ArrayList[(Boolean, Long)]] = Deferred.groupInOrder(defers)
-    grouped.toFuture.map(_.toSeq)
+    val grouped: Deferred[util.ArrayList[(Boolean, Long, Long)]] = Deferred.groupInOrder(defers)
+    grouped.toFuture(new util.ArrayList[(Boolean, Long, Long)]()).map(_.toSeq)
   }
 
 
   override def flush(): Unit = clients.foreach { client =>
     super.flush()
     val timeout = Duration((clientFlushInterval + 10) * 20, duration.MILLISECONDS)
-    Await.result(client.flush().toFuture, timeout)
+    Await.result(client.flush().toFuture(new AnyRef), timeout)
   }
 
 
@@ -394,40 +471,37 @@ class AsynchbaseStorage(override val graph: Graph,
                            cfs: List[String],
                            regionMultiplier: Int,
                            ttl: Option[Int],
-                           compressionAlgorithm: String): Unit = {
+                           compressionAlgorithm: String,
+                           replicationScopeOpt: Option[Int] = None,
+                           totalRegionCount: Option[Int] = None): Unit = {
     /** TODO: Decide if we will allow each app server to connect to multiple hbase cluster */
     for {
       zkAddr <- Seq(zkQuorum) ++ zkQuorumSlave.toSeq
     } {
       logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm")
       val admin = getAdmin(zkAddr)
-      val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier
+      val regionCount = totalRegionCount.getOrElse(admin.getClusterStatus.getServersSize * regionMultiplier)
       try {
         if (!admin.tableExists(TableName.valueOf(tableName))) {
-          try {
-            val desc = new HTableDescriptor(TableName.valueOf(tableName))
-            desc.setDurability(Durability.ASYNC_WAL)
-            for (cf <- cfs) {
-              val columnDesc = new HColumnDescriptor(cf)
-                .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
-                .setBloomFilterType(BloomType.ROW)
-                .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
-                .setMaxVersions(1)
-                .setTimeToLive(2147483647)
-                .setMinVersions(0)
-                .setBlocksize(32768)
-                .setBlockCacheEnabled(true)
-              if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
-              desc.addFamily(columnDesc)
-            }
-
-            if (regionCount <= 1) admin.createTable(desc)
-            else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount)
-          } catch {
-            case e: Throwable =>
-              logger.error(s"$zkAddr, $tableName failed with $e", e)
-              throw e
+          val desc = new HTableDescriptor(TableName.valueOf(tableName))
+          desc.setDurability(Durability.ASYNC_WAL)
+          for (cf <- cfs) {
+            val columnDesc = new HColumnDescriptor(cf)
+              .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
+              .setBloomFilterType(BloomType.ROW)
+              .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
+              .setMaxVersions(1)
+              .setTimeToLive(2147483647)
+              .setMinVersions(0)
+              .setBlocksize(32768)
+              .setBlockCacheEnabled(true)
+            if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
+            if (replicationScopeOpt.isDefined) columnDesc.setScope(replicationScopeOpt.get)
+            desc.addFamily(columnDesc)
           }
+
+          if (regionCount <= 1) admin.createTable(desc)
+          else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount)
         } else {
           logger.info(s"$zkAddr, $tableName, $cfs already exist.")
         }
@@ -445,12 +519,12 @@ class AsynchbaseStorage(override val graph: Graph,
 
   /** Asynchbase implementation override default getVertices to use future Cache */
   override def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = {
-    def fromResult(queryParam: QueryParam,
-                   kvs: Seq[SKeyValue],
+    def fromResult(kvs: Seq[SKeyValue],
                    version: String): Option[Vertex] = {
 
       if (kvs.isEmpty) None
-      else vertexDeserializer.fromKeyValues(queryParam, kvs, version, None)
+      else vertexDeserializer.fromKeyValues(None, kvs, version, None)
+//        .map(S2Vertex(graph, _))
     }
 
     val futures = vertices.map { vertex =>
@@ -461,55 +535,85 @@ class AsynchbaseStorage(override val graph: Graph,
       get.maxVersions(1)
 
       val cacheKey = MurmurHash3.stringHash(get.toString)
-      vertexCache.getOrElseUpdate(cacheKey, cacheTTL = 10000)(fetchVertexKeyValues(get)).map { kvs =>
-        fromResult(QueryParam.Empty, kvs, vertex.serviceColumn.schemaVersion)
+      vertexCache.getOrElseUpdate(cacheKey, cacheTTL = 10000)(fetchVertexKeyValues(Left(get))).map { kvs =>
+        fromResult(kvs, vertex.serviceColumn.schemaVersion)
       }
     }
 
     Future.sequence(futures).map { result => result.toList.flatten }
   }
 
+  class V4ResultHandler(scanner: Scanner, defer: Deferred[util.ArrayList[KeyValue]], offset: Int, limit : Int) extends Callback[Object, util.ArrayList[util.ArrayList[KeyValue]]] {
+    val results = new util.ArrayList[KeyValue]()
+    var offsetCount = 0
 
+    override def call(kvsLs: util.ArrayList[util.ArrayList[KeyValue]]): Object = {
+      try {
+        if (kvsLs == null) {
+          defer.callback(results)
+          Try(scanner.close())
+        } else {
+          val curRet = new util.ArrayList[KeyValue]()
+          kvsLs.foreach(curRet.addAll(_))
+          val prevOffset = offsetCount
+          offsetCount += curRet.size()
+
+          val nextRet = if(offsetCount > offset){
+            if(prevOffset < offset ) {
+              curRet.subList(offset - prevOffset, curRet.size())
+            } else{
+              curRet
+            }
+          } else{
+            emptyKeyValues
+          }
 
+          val needCount = limit - results.size()
+          if (needCount >= nextRet.size()) {
+            results.addAll(nextRet)
+          } else {
+            results.addAll(nextRet.subList(0, needCount))
+          }
 
+          if (results.size() < limit) {
+            scanner.nextRows().addCallback(this)
+          } else {
+            defer.callback(results)
+            Try(scanner.close())
+          }
+        }
+      } catch{
+        case ex: Exception =>
+          logger.error(s"fetchKeyValuesInner failed.", ex)
+          defer.callback(ex)
+          Try(scanner.close())
+      }
+    }
+  }
 
   /**
    * Private Methods which is specific to Asynchbase implementation.
    */
-  private def fetchKeyValuesInner(rpc: AnyRef): Deferred[util.ArrayList[KeyValue]] = {
+  private def fetchKeyValuesInner(rpc: AsyncRPC): Deferred[util.ArrayList[KeyValue]] = {
     rpc match {
-      case getRequest: GetRequest => client.get(getRequest)
-      case scanner: Scanner =>
-        scanner.nextRows().withCallback { kvsLs =>
-          val ls = new util.ArrayList[KeyValue]
-          if (kvsLs == null) {
-
-          } else {
-            kvsLs.foreach { kvs =>
-              if (kvs != null) kvs.foreach { kv => ls.add(kv) }
-              else {
-
-              }
-            }
-          }
-          scanner.close()
-          ls
-        }.recoverWith { ex =>
-          logger.error(s"fetchKeyValuesInner failed.", ex)
-          scanner.close()
-          emptyKeyValues
-        }
+      case Left(get) => client.get(get)
+      case Right(ScanWithRange(scanner, offset, limit)) =>
+        val deferred = new Deferred[util.ArrayList[KeyValue]]()
+        scanner.nextRows().addCallback(new V4ResultHandler(scanner, deferred, offset, limit))
+        deferred
       case _ => Deferred.fromError(new RuntimeException(s"fetchKeyValues failed. $rpc"))
     }
   }
 
-  private def toCacheKeyBytes(hbaseRpc: AnyRef): Array[Byte] = {
+  private def toCacheKeyBytes(hbaseRpc: AsyncRPC): Array[Byte] = {
+    /** with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */
     hbaseRpc match {
-      case getRequest: GetRequest => getRequest.key()
-      case scanner: Scanner => scanner.getCurrentKey()
+      case Left(getRequest) => getRequest.key
+      case Right(ScanWithRange(scanner, offset, limit)) =>
+        Bytes.add(scanner.getCurrentKey, Bytes.add(Bytes.toBytes(offset), Bytes.toBytes(limit)))
       case _ =>
         logger.error(s"toCacheKeyBytes failed. not supported class type. $hbaseRpc")
-        Array.empty[Byte]
+        throw new RuntimeException(s"toCacheKeyBytes: $hbaseRpc")
     }
   }
 
@@ -520,8 +624,6 @@ class AsynchbaseStorage(override val graph: Graph,
     val principal = config.getString("principal")
     val keytab = config.getString("keytab")
 
-
-
     System.setProperty("java.security.auth.login.config", jaas)
     System.setProperty("java.security.krb5.conf", krb5Conf)
     // System.setProperty("sun.security.krb5.debug", "true")

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
index e2b7c2f..2428173 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -20,141 +20,125 @@
 package org.apache.s2graph.core.storage.serde.indexedge.tall
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
 import org.apache.s2graph.core.storage.StorageDeserializable._
-import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, StorageDeserializable}
+import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, StorageDeserializable}
 import org.apache.s2graph.core.types._
-import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex}
+import org.apache.s2graph.core.utils.logger
+import org.apache.s2graph.core.{GraphUtil, IndexEdge, Vertex}
 import scala.collection.immutable
 
+object IndexEdgeDeserializable{
+  def getNewInstance() = new IndexEdgeDeserializable()
+}
 class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] {
    import StorageDeserializable._
 
-   type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int)
-   type ValueRaw = (Array[(Byte, InnerValLike)], Int)
+   type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int)
+   type ValueRaw = (Array[(LabelMeta, InnerValLike)], Int)
 
-   private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = {
-     //    val degree = Bytes.toLong(kv.value)
-     val degree = bytesToLongFunc(kv.value, 0)
-     val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version))
-     val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version))
-     (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
-   }
+   override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
+                                                    _kvs: Seq[T],
+                                                    schemaVer: String,
+                                                    cacheElementOpt: Option[IndexEdge]): IndexEdge = {
+
+     assert(_kvs.size == 1)
+
+     //     val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+     val kv = implicitly[CanSKeyValue[T]].toSKeyValue(_kvs.head)
+//     logger.debug(s"[DES]: ${kv.toLogString}}")
 
-   private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = {
-     var qualifierLen = 0
+     val version = kv.timestamp
+     //    logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}")
      var pos = 0
-     val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = {
-       val (props, endAt) = bytesToProps(kv.qualifier, pos, version)
+     val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, schemaVer)
+     pos += srcIdLen
+     val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
+     pos += 4
+     val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
+     pos += 1
+
+     val label = checkLabel.getOrElse(Label.findById(labelWithDir.labelId))
+//     val op = kv.row(pos)
+//     pos += 1
+
+     if (pos == kv.row.length) {
+       // degree
+       //      val degreeVal = Bytes.toLong(kv.value)
+       val degreeVal = bytesToLongFunc(kv.value, 0)
+       val ts = kv.timestamp
+       val tsInnerValLikeWithTs = InnerValLikeWithTs.withLong(ts, ts, schemaVer)
+       val props = Map(LabelMeta.timestamp -> tsInnerValLikeWithTs,
+         LabelMeta.degree -> InnerValLikeWithTs.withLong(degreeVal, ts, schemaVer))
+       val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer))
+       IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), label, labelWithDir.dir, GraphUtil.defaultOpByte, ts, labelIdxSeq, props,  tsInnerValOpt = Option(tsInnerValLikeWithTs.innerVal))
+     } else {
+       // not degree edge
+       val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, schemaVer)
        pos = endAt
-       qualifierLen += endAt
-       val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) {
+
+
+       val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length - 1) {
          (HBaseType.defaultTgtVertexId, 0)
        } else {
-         TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version)
+         TargetVertexId.fromBytes(kv.row, endAt, kv.row.length - 1, schemaVer)
        }
-       qualifierLen += tgtVertexIdLen
-       (props, endAt, tgtVertexId, tgtVertexIdLen)
-     }
-     val (op, opLen) =
-       if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0)
-       else (kv.qualifier(qualifierLen), 1)
+       val op = kv.row(kv.row.length-1)
+
+       val allProps = immutable.Map.newBuilder[LabelMeta, InnerValLikeWithTs]
+       val index = label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}"))
+
+       /** process indexProps */
+       val size = idxPropsRaw.length
+       (0 until size).foreach { ith =>
+         val meta = index.sortKeyTypesArray(ith)
+         val (k, v) = idxPropsRaw(ith)
+         if (k == LabelMeta.degree) allProps += LabelMeta.degree -> InnerValLikeWithTs(v, version)
+         else allProps += meta -> InnerValLikeWithTs(v, version)
+       }
+//       for {
+//         (meta, (k, v)) <- index.sortKeyTypes.zip(idxPropsRaw)
+//       } {
+//         if (k == LabelMeta.degree) allProps += LabelMeta.degree -> InnerValLikeWithTs(v, version)
+//         else {
+//           allProps += meta -> InnerValLikeWithTs(v, version)
+//         }
+//       }
+
+       /** process props */
+       if (op == GraphUtil.operations("incrementCount")) {
+         //        val countVal = Bytes.toLong(kv.value)
+         val countVal = bytesToLongFunc(kv.value, 0)
+         allProps += (LabelMeta.count -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
+       } else {
+         val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer, label)
+         props.foreach { case (k, v) =>
+           allProps += (k -> InnerValLikeWithTs(v, version))
+         }
+       }
+       val _mergedProps = allProps.result()
+       val (mergedProps, tsInnerValLikeWithTs) = _mergedProps.get(LabelMeta.timestamp) match {
+         case None =>
+           val tsInnerVal = InnerValLikeWithTs.withLong(version, version, schemaVer)
+           val mergedProps = _mergedProps + (LabelMeta.timestamp -> InnerValLikeWithTs.withLong(version, version, schemaVer))
+           (mergedProps, tsInnerVal)
+         case Some(tsInnerVal) =>
+           (_mergedProps, tsInnerVal)
+       }
+//       val mergedProps =
+//         if (_mergedProps.contains(LabelMeta.timestamp)) _mergedProps
+//         else _mergedProps + (LabelMeta.timestamp -> InnerValLikeWithTs.withLong(version, version, schemaVer))
 
-     qualifierLen += opLen
+       /** process tgtVertexId */
+       val tgtVertexId =
+         mergedProps.get(LabelMeta.to) match {
+           case None => tgtVertexIdRaw
+           case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
+         }
 
-     (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen)
-   }
 
-   private def parseValue(kv: SKeyValue, version: String): ValueRaw = {
-     val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version)
-     (props, endAt)
-   }
+       IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), label, labelWithDir.dir, op, version, labelIdxSeq, mergedProps, tsInnerValOpt = Option(tsInnerValLikeWithTs.innerVal))
 
-   private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = {
-     (Array.empty[(Byte, InnerValLike)], 0)
+     }
    }
-
-  override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
-                                                   _kvs: Seq[T],
-                                                   schemaVer: String,
-                                                   cacheElementOpt: Option[IndexEdge]): IndexEdge = {
-
-    assert(_kvs.size == 1)
-
-    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-
-    val kv = kvs.head
-    val version = kv.timestamp
-    //    logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}")
-    var pos = 0
-    val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, schemaVer)
-    pos += srcIdLen
-    val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
-    pos += 4
-    val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
-    pos += 1
-
-    val op = kv.row(pos)
-    pos += 1
-
-    if (pos == kv.row.length) {
-      // degree
-      //      val degreeVal = Bytes.toLong(kv.value)
-      val degreeVal = bytesToLongFunc(kv.value, 0)
-      val ts = kv.timestamp
-      val props = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(ts, ts, schemaVer),
-        LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(degreeVal, ts, schemaVer))
-      val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer))
-      IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props)
-    } else {
-      // not degree edge
-
-
-      val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, schemaVer)
-      pos = endAt
-      val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length) {
-        (HBaseType.defaultTgtVertexId, 0)
-      } else {
-        TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, schemaVer)
-      }
-
-      val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs]
-      val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
-
-      /* process indexProps */
-      for {
-        (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
-      } {
-        if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version)
-        else allProps += seq -> InnerValLikeWithTs(v, version)
-      }
-
-      /* process props */
-      if (op == GraphUtil.operations("incrementCount")) {
-        //        val countVal = Bytes.toLong(kv.value)
-        val countVal = bytesToLongFunc(kv.value, 0)
-        allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
-      } else {
-        val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer)
-        props.foreach { case (k, v) =>
-          allProps += (k -> InnerValLikeWithTs(v, version))
-        }
-      }
-      val _mergedProps = allProps.result()
-      val mergedProps =
-        if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
-        else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer))
-
-      /* process tgtVertexId */
-      val tgtVertexId =
-        mergedProps.get(LabelMeta.toSeq) match {
-          case None => tgtVertexIdRaw
-          case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
-        }
-
-
-      IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps)
-
-    }
-  }
-}
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
index a76bd1f..cd242dc 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
@@ -21,16 +21,16 @@ package org.apache.s2graph.core.storage.serde.indexedge.tall
 
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core.mysqls.LabelMeta
-import org.apache.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable}
+import org.apache.s2graph.core.storage.{Serializable, StorageSerializable}
 import org.apache.s2graph.core.types.VertexId
 import org.apache.s2graph.core.{GraphUtil, IndexEdge}
+import org.apache.s2graph.core.storage.StorageSerializable._
 
-
-class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] {
+class IndexEdgeSerializable(indexEdge: IndexEdge, longToBytes: Long => Array[Byte] = longToBytes) extends Serializable[IndexEdge] {
    import StorageSerializable._
 
-   override val ts = indexEdge.version
-   override val table = indexEdge.label.hbaseTableName.getBytes()
+   override def ts = indexEdge.version
+   override def table = indexEdge.label.hbaseTableName.getBytes()
 
    def idxPropsMap = indexEdge.orders.toMap
    def idxPropsBytes = propsToBytes(indexEdge.orders)
@@ -43,25 +43,25 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge
      val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
      //    logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}")
 
-     val qualifier =
-       if (indexEdge.degreeEdge) Array.empty[Byte]
-       else
-         idxPropsMap.get(LabelMeta.toSeq) match {
+     if (indexEdge.degreeEdge) row
+     else {
+       val qualifier = idxPropsMap.get(LabelMeta.to) match {
            case None => Bytes.add(idxPropsBytes, VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes)
            case Some(vId) => idxPropsBytes
          }
 
-     /* TODO search usage of op byte. if there is no, then remove opByte */
-     Bytes.add(row, Array.fill(1)(GraphUtil.defaultOpByte), qualifier)
+       val opByte = if (indexEdge.op == GraphUtil.operations("incrementCount")) indexEdge.op else GraphUtil.defaultOpByte
+       Bytes.add(row, qualifier, Array.fill(1)(opByte))
+     }
    }
 
    override def toQualifier: Array[Byte] = Array.empty[Byte]
 
    override def toValue: Array[Byte] =
      if (indexEdge.degreeEdge)
-       Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong)
+       longToBytes(indexEdge.props(LabelMeta.degree).innerVal.toString().toLong)
      else if (indexEdge.op == GraphUtil.operations("incrementCount"))
-       Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong)
+       longToBytes(indexEdge.props(LabelMeta.count).innerVal.toString().toLong)
      else propsToKeyValues(indexEdge.metas.toSeq)
 
  }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
index eb3d765..534667b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -19,122 +19,126 @@
 
 package org.apache.s2graph.core.storage.serde.indexedge.wide
 
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
 import org.apache.s2graph.core.storage.StorageDeserializable._
 import org.apache.s2graph.core.storage._
 import org.apache.s2graph.core.types._
-import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex}
-import scala.collection.immutable
-
-import scala.collection.immutable
-
+import org.apache.s2graph.core.{GraphUtil, IndexEdge, Vertex}
 import scala.collection.immutable
 
 class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] {
-
-
-  import StorageDeserializable._
-
-  type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int)
-  type ValueRaw = (Array[(Byte, InnerValLike)], Int)
-
-  private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = {
-    //    val degree = Bytes.toLong(kv.value)
-    val degree = bytesToLongFunc(kv.value, 0)
-    val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version))
-    val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version))
-    (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
-  }
-
-  private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = {
-    var qualifierLen = 0
-    var pos = 0
-    val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = {
-      val (props, endAt) = bytesToProps(kv.qualifier, pos, version)
-      pos = endAt
-      qualifierLen += endAt
-      val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) {
-        (HBaseType.defaultTgtVertexId, 0)
-      } else {
-        TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version)
-      }
-      qualifierLen += tgtVertexIdLen
-      (props, endAt, tgtVertexId, tgtVertexIdLen)
-    }
-    val (op, opLen) =
-      if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0)
-      else (kv.qualifier(qualifierLen), 1)
-
-    qualifierLen += opLen
-
-    (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen)
-  }
-
-  private def parseValue(kv: SKeyValue, version: String): ValueRaw = {
-    val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version)
-    (props, endAt)
-  }
-
-  private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = {
-    (Array.empty[(Byte, InnerValLike)], 0)
-  }
-
-  override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
-                                                   _kvs: Seq[T],
-                                                   schemaVer: String,
-                                                   cacheElementOpt: Option[IndexEdge]): IndexEdge = {
-    assert(_kvs.size == 1)
-
-    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-
-    val kv = kvs.head
-    val version = kv.timestamp
-
-    val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e =>
-      (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0)
-    }.getOrElse(parseRow(kv, schemaVer))
-
-    val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) =
-      if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, schemaVer)
-      else parseQualifier(kv, schemaVer)
-
-    val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs]
-    val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
-
-    /* process indexProps */
-    for {
-      (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
-    } {
-      if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version)
-      else allProps += seq -> InnerValLikeWithTs(v, version)
-    }
-
-    /* process props */
-    if (op == GraphUtil.operations("incrementCount")) {
-      //      val countVal = Bytes.toLong(kv.value)
-      val countVal = bytesToLongFunc(kv.value, 0)
-      allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
-    } else if (kv.qualifier.isEmpty) {
-      val countVal = bytesToLongFunc(kv.value, 0)
-      allProps += (LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
-    } else {
-      val (props, _) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer)
-      props.foreach { case (k, v) => allProps += (k -> InnerValLikeWithTs(v, version)) }
-    }
-
-    val _mergedProps = allProps.result()
-    val mergedProps =
-      if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
-      else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer))
-
-    /* process tgtVertexId */
-    val tgtVertexId =
-      mergedProps.get(LabelMeta.toSeq) match {
-        case None => tgtVertexIdRaw
-        case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
-      }
-
-    IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps)
-
-  }
-}
+   import StorageDeserializable._
+
+   type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int)
+   type ValueRaw = (Array[(LabelMeta, InnerValLike)], Int)
+
+   private def parseDegreeQualifier(kv: SKeyValue, schemaVer: String): QualifierRaw = {
+     //    val degree = Bytes.toLong(kv.value)
+     val degree = bytesToLongFunc(kv.value, 0)
+     val idxPropsRaw = Array(LabelMeta.degree -> InnerVal.withLong(degree, schemaVer))
+     val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer))
+     (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
+   }
+
+   private def parseQualifier(kv: SKeyValue, schemaVer: String): QualifierRaw = {
+     var qualifierLen = 0
+     var pos = 0
+     val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = {
+       val (props, endAt) = bytesToProps(kv.qualifier, pos, schemaVer)
+       pos = endAt
+       qualifierLen += endAt
+       val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) {
+         (HBaseType.defaultTgtVertexId, 0)
+       } else {
+         TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, schemaVer)
+       }
+       qualifierLen += tgtVertexIdLen
+       (props, endAt, tgtVertexId, tgtVertexIdLen)
+     }
+     val (op, opLen) =
+       if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0)
+       else (kv.qualifier(qualifierLen), 1)
+
+     qualifierLen += opLen
+
+     (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen)
+   }
+
+   override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
+                                                    _kvs: Seq[T],
+                                                    schemaVer: String,
+                                                    cacheElementOpt: Option[IndexEdge]): IndexEdge = {
+     assert(_kvs.size == 1)
+
+//     val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+
+     val kv = implicitly[CanSKeyValue[T]].toSKeyValue(_kvs.head)
+     val version = kv.timestamp
+
+     val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e =>
+       (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0)
+     }.getOrElse(parseRow(kv, schemaVer))
+
+     val label = checkLabel.getOrElse(Label.findById(labelWithDir.labelId))
+
+     val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) =
+       if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, schemaVer)
+       else parseQualifier(kv, schemaVer)
+
+     val allProps = immutable.Map.newBuilder[LabelMeta, InnerValLikeWithTs]
+     val index = label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}"))
+
+     /** process indexProps */
+     val size = idxPropsRaw.length
+     (0 until size).foreach { ith =>
+       val meta = index.sortKeyTypesArray(ith)
+       val (k, v) = idxPropsRaw(ith)
+       if (k == LabelMeta.degree) allProps += LabelMeta.degree -> InnerValLikeWithTs(v, version)
+       else allProps += meta -> InnerValLikeWithTs(v, version)
+     }
+//     for {
+//       (seq, (k, v)) <- index.sortKeyTypes.zip(idxPropsRaw)
+//     } {
+//       if (k == LabelMeta.degree) allProps += LabelMeta.degree -> InnerValLikeWithTs(v, version)
+//       else allProps += seq -> InnerValLikeWithTs(v, version)
+//     }
+
+     /** process props */
+     if (op == GraphUtil.operations("incrementCount")) {
+       //      val countVal = Bytes.toLong(kv.value)
+       val countVal = bytesToLongFunc(kv.value, 0)
+       allProps += (LabelMeta.count -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
+     } else if (kv.qualifier.isEmpty) {
+       val countVal = bytesToLongFunc(kv.value, 0)
+       allProps += (LabelMeta.degree -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
+     } else {
+       val (props, _) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer, label)
+       props.foreach { case (k, v) =>
+         allProps += (k -> InnerValLikeWithTs(v, version))
+       }
+     }
+
+     val _mergedProps = allProps.result()
+     val (mergedProps, tsInnerValLikeWithTs) = _mergedProps.get(LabelMeta.timestamp) match {
+       case None =>
+         val tsInnerVal = InnerValLikeWithTs.withLong(version, version, schemaVer)
+         val mergedProps = _mergedProps + (LabelMeta.timestamp -> InnerValLikeWithTs.withLong(version, version, schemaVer))
+         (mergedProps, tsInnerVal)
+       case Some(tsInnerVal) =>
+         (_mergedProps, tsInnerVal)
+     }
+//     val mergedProps =
+//       if (_mergedProps.contains(LabelMeta.timestamp)) _mergedProps
+//            else _mergedProps + (LabelMeta.timestamp -> InnerValLikeWithTs.withLong(version, version, schemaVer))
+
+     /** process tgtVertexId */
+     val tgtVertexId =
+       mergedProps.get(LabelMeta.to) match {
+         case None => tgtVertexIdRaw
+         case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
+       }
+
+     IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), label, labelWithDir.dir, op, version, labelIdxSeq, mergedProps, tsInnerValOpt = Option(tsInnerValLikeWithTs.innerVal))
+
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
index c700e53..211b159 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
@@ -24,12 +24,13 @@ import org.apache.s2graph.core.mysqls.LabelMeta
 import org.apache.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable}
 import org.apache.s2graph.core.types.VertexId
 import org.apache.s2graph.core.{GraphUtil, IndexEdge}
+import org.apache.s2graph.core.storage.StorageSerializable._
 
-class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] {
+class IndexEdgeSerializable(indexEdge: IndexEdge, longToBytes: Long => Array[Byte] = longToBytes) extends Serializable[IndexEdge] {
    import StorageSerializable._
 
-   override val ts = indexEdge.version
-   override val table = indexEdge.label.hbaseTableName.getBytes()
+   override def ts = indexEdge.version
+   override def table = indexEdge.label.hbaseTableName.getBytes()
 
    def idxPropsMap = indexEdge.orders.toMap
    def idxPropsBytes = propsToBytes(indexEdge.orders)
@@ -49,7 +50,7 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge
        if (indexEdge.op == GraphUtil.operations("incrementCount")) {
          Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op))
        } else {
-         idxPropsMap.get(LabelMeta.toSeq) match {
+         idxPropsMap.get(LabelMeta.to) match {
            case None => Bytes.add(idxPropsBytes, tgtIdBytes)
            case Some(vId) => idxPropsBytes
          }
@@ -59,9 +60,9 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge
 
   override def toValue: Array[Byte] =
     if (indexEdge.degreeEdge)
-      Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong)
+      longToBytes(indexEdge.props(LabelMeta.degree).innerVal.toString().toLong)
     else if (indexEdge.op == GraphUtil.operations("incrementCount"))
-      Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong)
+      longToBytes(indexEdge.props(LabelMeta.count).innerVal.toString().toLong)
     else propsToKeyValues(indexEdge.metas.toSeq)
 
  }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
index 368e3f3..91b8db1 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
@@ -20,11 +20,11 @@
 package org.apache.s2graph.core.storage.serde.snapshotedge.tall
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.{LabelIndex, LabelMeta}
+import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
 import org.apache.s2graph.core.storage.StorageDeserializable._
 import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, StorageDeserializable}
 import org.apache.s2graph.core.types.{HBaseType, LabelWithDirection, SourceAndTargetVertexIdPair, SourceVertexId}
-import org.apache.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex}
+import org.apache.s2graph.core.{Edge, SnapshotEdge, Vertex}
 
 class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
 
@@ -34,7 +34,7 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
     (statusCode.toByte, op.toByte)
   }
 
-  override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
+  override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
                                                    _kvs: Seq[T],
                                                    version: String,
                                                    cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = {
@@ -42,9 +42,10 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
     assert(kvs.size == 1)
 
     val kv = kvs.head
-    val schemaVer = queryParam.label.schemaVersion
+    val label = checkLabel.get
+    val schemaVer = label.schemaVersion
     val cellVersion = kv.timestamp
-    /* rowKey */
+    /** rowKey */
     def parseRowV3(kv: SKeyValue, version: String) = {
       var pos = 0
       val (srcIdAndTgtId, srcIdAndTgtIdLen) = SourceAndTargetVertexIdPair.fromBytes(kv.row, pos, kv.row.length, version)
@@ -64,13 +65,14 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
     val srcVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, srcInnerId)
     val tgtVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, tgtInnerId)
 
-    val (props, op, ts, statusCode, _pendingEdgeOpt) = {
+    val (props, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal) = {
       var pos = 0
       val (statusCode, op) = statusCodeWithOp(kv.value(pos))
       pos += 1
-      val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
+      val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer, label)
       val kvsMap = props.toMap
-      val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong
+      val tsInnerVal = kvsMap(LabelMeta.timestamp).innerVal
+      val ts = tsInnerVal.toString.toLong
 
       pos = endAt
       val _pendingEdgeOpt =
@@ -80,24 +82,24 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
           pos += 1
           //          val versionNum = Bytes.toLong(kv.value, pos, 8)
           //          pos += 8
-          val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
+          val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer, label)
           pos = endAt
           val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
 
           val pendingEdge =
             Edge(Vertex(srcVertexId, cellVersion),
               Vertex(tgtVertexId, cellVersion),
-              labelWithDir, pendingEdgeOp,
+              label, labelWithDir.dir, pendingEdgeOp,
               cellVersion, pendingEdgeProps.toMap,
-              statusCode = pendingEdgeStatusCode, lockTs = lockTs)
+              statusCode = pendingEdgeStatusCode, lockTs = lockTs, tsInnerValOpt = Option(tsInnerVal))
           Option(pendingEdge)
         }
 
-      (kvsMap, op, ts, statusCode, _pendingEdgeOpt)
+      (kvsMap, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal)
     }
 
     SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts),
-      labelWithDir, op, cellVersion, props, statusCode = statusCode,
-      pendingEdgeOpt = _pendingEdgeOpt, lockTs = None)
+      label, labelWithDir.dir, op, cellVersion, props, statusCode = statusCode,
+      pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
index 4f7c17b..fc84469 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
@@ -29,8 +29,8 @@ import org.apache.s2graph.core.types.SourceAndTargetVertexIdPair
 class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] {
   import StorageSerializable._
 
-  override val ts = snapshotEdge.version
-  override val table = snapshotEdge.label.hbaseTableName.getBytes()
+  override def ts = snapshotEdge.version
+  override def table = snapshotEdge.label.hbaseTableName.getBytes()
 
   def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = {
     val byte = (((statusCode << 4) | op).toByte)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
index 8aca2cf..8d95e77 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
@@ -20,11 +20,11 @@
 package org.apache.s2graph.core.storage.serde.snapshotedge.wide
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.{LabelIndex, LabelMeta}
+import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
 import org.apache.s2graph.core.storage.StorageDeserializable._
 import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, StorageDeserializable}
 import org.apache.s2graph.core.types.TargetVertexId
-import org.apache.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex}
+import org.apache.s2graph.core.{Edge, SnapshotEdge, Vertex}
 
 class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
 
@@ -34,7 +34,7 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
     (statusCode.toByte, op.toByte)
   }
 
-  override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
+  override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
                                                    _kvs: Seq[T],
                                                    version: String,
                                                    cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = {
@@ -42,21 +42,23 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
     assert(kvs.size == 1)
 
     val kv = kvs.head
-    val schemaVer = queryParam.label.schemaVersion
+    val label = checkLabel.get
+    val schemaVer = label.schemaVersion
     val cellVersion = kv.timestamp
 
     val (srcVertexId, labelWithDir, _, _, _) = cacheElementOpt.map { e =>
       (e.srcVertex.id, e.labelWithDir, LabelIndex.DefaultSeq, true, 0)
     }.getOrElse(parseRow(kv, schemaVer))
 
-    val (tgtVertexId, props, op, ts, statusCode, _pendingEdgeOpt) = {
+    val (tgtVertexId, props, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal) = {
       val (tgtVertexId, _) = TargetVertexId.fromBytes(kv.qualifier, 0, kv.qualifier.length, schemaVer)
       var pos = 0
       val (statusCode, op) = statusCodeWithOp(kv.value(pos))
       pos += 1
-      val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
+      val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer, label)
       val kvsMap = props.toMap
-      val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong
+      val tsInnerVal = kvsMap(LabelMeta.timestamp).innerVal
+      val ts = tsInnerVal.toString.toLong
 
       pos = endAt
       val _pendingEdgeOpt =
@@ -66,24 +68,24 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
           pos += 1
           //          val versionNum = Bytes.toLong(kv.value, pos, 8)
           //          pos += 8
-          val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
+          val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer, label)
           pos = endAt
           val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
 
           val pendingEdge =
             Edge(Vertex(srcVertexId, cellVersion),
               Vertex(tgtVertexId, cellVersion),
-              labelWithDir, pendingEdgeOp,
+              label, labelWithDir.dir, pendingEdgeOp,
               cellVersion, pendingEdgeProps.toMap,
-              statusCode = pendingEdgeStatusCode, lockTs = lockTs)
+              statusCode = pendingEdgeStatusCode, lockTs = lockTs, tsInnerValOpt = Option(tsInnerVal))
           Option(pendingEdge)
         }
 
-      (tgtVertexId, kvsMap, op, ts, statusCode, _pendingEdgeOpt)
+      (tgtVertexId, kvsMap, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal)
     }
 
     SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts),
-      labelWithDir, op, cellVersion, props, statusCode = statusCode,
-      pendingEdgeOpt = _pendingEdgeOpt, lockTs = None)
+      label, labelWithDir.dir, op, cellVersion, props, statusCode = statusCode,
+      pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
index 757ef1b..4ceb4a8 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
@@ -34,8 +34,8 @@ import org.apache.s2graph.core.types.VertexId
 class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] {
   import StorageSerializable._
 
-  override val ts = snapshotEdge.version
-  override val table = snapshotEdge.label.hbaseTableName.getBytes()
+  override def ts = snapshotEdge.version
+  override def table = snapshotEdge.label.hbaseTableName.getBytes()
 
   def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = {
     val byte = (((statusCode << 4) | op).toByte)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
index 737c2a8..3ec17ab 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,18 +19,19 @@
 
 package org.apache.s2graph.core.storage.serde.vertex
 
-import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.storage.StorageDeserializable._
 import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable}
 import org.apache.s2graph.core.types.{InnerVal, InnerValLike, VertexId}
 import org.apache.s2graph.core.{QueryParam, Vertex}
 
 import scala.collection.mutable.ListBuffer
 
-class VertexDeserializable extends Deserializable[Vertex] {
-  def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
-                                     _kvs: Seq[T],
-                                     version: String,
-                                     cacheElementOpt: Option[Vertex]): Vertex = {
+class VertexDeserializable(bytesToInt: (Array[Byte], Int) => Int = bytesToInt) extends Deserializable[Vertex] {
+  def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
+                                          _kvs: Seq[T],
+                                          version: String,
+                                          cacheElementOpt: Option[Vertex]): Vertex = {
 
     val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
 
@@ -46,7 +47,7 @@ class VertexDeserializable extends Deserializable[Vertex] {
     } {
       val propKey =
         if (kv.qualifier.length == 1) kv.qualifier.head.toInt
-        else Bytes.toInt(kv.qualifier)
+        else bytesToInt(kv.qualifier, 0)
 
       val ts = kv.timestamp
       if (ts > maxTs) maxTs = ts
@@ -63,4 +64,3 @@ class VertexDeserializable extends Deserializable[Vertex] {
     Vertex(vertexId, maxTs, propsMap.toMap, belongLabelIds = belongLabelIds)
   }
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
index 6bb162c..77bbb87 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,11 +19,12 @@
 
 package org.apache.s2graph.core.storage.serde.vertex
 
-import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core.Vertex
+import org.apache.s2graph.core.storage.StorageSerializable._
 import org.apache.s2graph.core.storage.{SKeyValue, Serializable}
+import org.apache.s2graph.core.utils.logger
 
-case class VertexSerializable(vertex: Vertex) extends Serializable[Vertex] {
+case class VertexSerializable(vertex: Vertex, intToBytes: Int => Array[Byte] = intToBytes) extends Serializable[Vertex] {
 
   override val table = vertex.hbaseTableName.getBytes
   override val ts = vertex.ts
@@ -37,10 +38,10 @@ case class VertexSerializable(vertex: Vertex) extends Serializable[Vertex] {
   /** vertex override toKeyValues since vertex expect to produce multiple sKeyValues */
   override def toKeyValues: Seq[SKeyValue] = {
     val row = toRowKey
-    val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield Bytes.toBytes(k) -> v.bytes
-    val belongsTo = vertex.belongLabelIds.map { labelId => Bytes.toBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] }
+    val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield intToBytes(k) -> v.bytes
+    val belongsTo = vertex.belongLabelIds.map { labelId => intToBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] }
     (base ++ belongsTo).map { case (qualifier, value) =>
       SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, vertex.ts)
     } toSeq
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
index c03319d..b885bc6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
@@ -20,6 +20,7 @@
 package org.apache.s2graph.core.types
 
 import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.mysqls.LabelMeta
 
 object HBaseType {
   val VERSION4 = "v4"
@@ -28,7 +29,7 @@ object HBaseType {
   val VERSION1 = "v1"
 //  val DEFAULT_VERSION = VERSION2
   val DEFAULT_VERSION = VERSION3
-  val EMPTY_SEQ_BYTE = Byte.MaxValue
+//  val EMPTY_SEQ_BYTE = Byte.MaxValue
   val DEFAULT_COL_ID = 0
   val bitsForDir = 2
   val maxBytes = Bytes.toBytes(Int.MaxValue)
@@ -100,7 +101,7 @@ object HBaseDeserializable {
     val kvs = new Array[(Byte, InnerValLike)](len)
     var i = 0
     while (i < len) {
-      val k = EMPTY_SEQ_BYTE
+      val k = LabelMeta.emptySeq
       val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, version)
       pos += numOfBytesUsed
       kvs(i) = (k -> v)