You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/03/04 15:56:20 UTC
incubator-s2graph git commit: [S2GRAPH-53]: Refactor Storage to
decide which serializer/deserializer for IndexEdge/SnapshotEdge/Vertex.
Repository: incubator-s2graph
Updated Branches:
refs/heads/master e207f676f -> fd8119bc9
[S2GRAPH-53]: Refactor Storage to decide which serializer/deserializer for IndexEdge/SnapshotEdge/Vertex.
add serde package and change storage to contain compatability table.
JIRA:
[S2GRAPH-53] https://issues.apache.org/jira/browse/S2GRAPH-53
Pull Request:
Closes #37
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/fd8119bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/fd8119bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/fd8119bc
Branch: refs/heads/master
Commit: fd8119bc9dc1cabc07bcf8b7dc49258345f45a3e
Parents: e207f67
Author: DO YUNG YOON <st...@apache.org>
Authored: Fri Mar 4 23:49:30 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Fri Mar 4 23:49:30 2016 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../core/storage/IndexEdgeDeserializable.scala | 128 -----------------
.../core/storage/IndexEdgeSerializable.scala | 58 --------
.../storage/SnapshotEdgeDeserializable.scala | 142 -------------------
.../core/storage/SnapshotEdgeSerializable.scala | 76 ----------
.../kakao/s2graph/core/storage/Storage.scala | 59 ++++++--
.../core/storage/VertexDeserializable.scala | 46 ------
.../core/storage/VertexSerializable.scala | 18 ---
.../tall/IndexEdgeDeserializable.scala | 132 +++++++++++++++++
.../indexedge/tall/IndexEdgeSerializable.scala | 53 +++++++
.../wide/IndexEdgeDeserializable.scala | 116 +++++++++++++++
.../indexedge/wide/IndexEdgeSerializable.scala | 53 +++++++
.../tall/SnapshotEdgeDeserializable.scala | 84 +++++++++++
.../tall/SnapshotEdgeSerializable.scala | 47 ++++++
.../wide/SnapshotEdgeDeserializable.scala | 70 +++++++++
.../wide/SnapshotEdgeSerializable.scala | 50 +++++++
.../serde/vertex/VertexDeserializable.scala | 46 ++++++
.../serde/vertex/VertexSerializable.scala | 20 +++
18 files changed, 725 insertions(+), 476 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 97434d6..a91718f 100644
--- a/CHANGES
+++ b/CHANGES
@@ -37,6 +37,9 @@ Release 0.12.1 - unreleased
S2GRAPH-44: Provide cache for WhereParser on query (Committed by DOYUNG YOON).
+ S2GRAPH-53: Refactor Storage to decide which serializer/deserializer for IndexEdge/SnapshotEdge/Vertex
+ (Committed by DOYUNG YOON).
+
BUG FIXES
S2GRAPH-18: Query Option "interval" is Broken.
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeDeserializable.scala
deleted file mode 100644
index 2190222..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeDeserializable.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-package com.kakao.s2graph.core.storage
-
-import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls.LabelMeta
-import com.kakao.s2graph.core.storage.{CanSKeyValue, StorageDeserializable, SKeyValue}
-import com.kakao.s2graph.core.types._
-import org.apache.hadoop.hbase.util.Bytes
-import StorageDeserializable._
-
-class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] {
-
- import StorageDeserializable._
-
- type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int)
- type ValueRaw = (Array[(Byte, InnerValLike)], Int)
-
- private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = {
-// val degree = Bytes.toLong(kv.value)
- val degree = bytesToLongFunc(kv.value, 0)
- val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version))
- val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version))
- (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
- }
-
- private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = {
- var qualifierLen = 0
- var pos = 0
- val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = {
- val (props, endAt) = bytesToProps(kv.qualifier, pos, version)
- pos = endAt
- qualifierLen += endAt
- val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) {
- (HBaseType.defaultTgtVertexId, 0)
- } else {
- TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version)
- }
- qualifierLen += tgtVertexIdLen
- (props, endAt, tgtVertexId, tgtVertexIdLen)
- }
- val (op, opLen) =
- if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0)
- else (kv.qualifier(qualifierLen), 1)
-
- qualifierLen += opLen
-
- (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen)
- }
-
- private def parseValue(kv: SKeyValue, version: String): ValueRaw = {
- val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version)
- (props, endAt)
- }
-
- private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = {
- (Array.empty[(Byte, InnerValLike)], 0)
- }
-
-
-
- /** version 1 and version 2 is same logic */
- override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
- _kvs: Seq[T],
- version: String,
- cacheElementOpt: Option[IndexEdge] = None): IndexEdge = {
- fromKeyValuesInnerOld(queryParam, _kvs, version, cacheElementOpt)
- }
-
- def fromKeyValuesInnerOld[T: CanSKeyValue](queryParam: QueryParam,
- _kvs: Seq[T],
- version: String,
- cacheElementOpt: Option[IndexEdge] = None): IndexEdge = {
- assert(_kvs.size == 1)
-
- val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-
- val kv = kvs.head
- val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e =>
- (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0)
- }.getOrElse(parseRow(kv, version))
-
- val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) =
- if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, version)
- else parseQualifier(kv, version)
-
- val (props, _) = if (op == GraphUtil.operations("incrementCount")) {
-// val countVal = Bytes.toLong(kv.value)
- val countVal = bytesToLongFunc(kv.value, 0)
- val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version))
- (dummyProps, 8)
- } else if (kv.qualifier.isEmpty) {
- parseDegreeValue(kv, version)
- } else {
- parseValue(kv, version)
- }
-
- val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
-
-
- // assert(kv.qualifier.nonEmpty && index.metaSeqs.size == idxPropsRaw.size)
-
- val idxProps = for {
- (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
- } yield {
- if (k == LabelMeta.degreeSeq) k -> v
- else seq -> v
- }
-
- val idxPropsMap = idxProps.toMap
- val tgtVertexId = if (tgtVertexIdInQualifier) {
- idxPropsMap.get(LabelMeta.toSeq) match {
- case None => tgtVertexIdRaw
- case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId)
- }
- } else tgtVertexIdRaw
-
- val _mergedProps = (idxProps ++ props).toMap
- val mergedProps =
- if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
- else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version))
-
- // logger.error(s"$mergedProps")
- // val ts = mergedProps(LabelMeta.timeStampSeq).toString().toLong
-
- val ts = kv.timestamp
- IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps)
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeSerializable.scala
deleted file mode 100644
index 56f70b9..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeSerializable.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-package com.kakao.s2graph.core.storage
-
-import com.kakao.s2graph.core.mysqls.LabelMeta
-import com.kakao.s2graph.core.storage.{StorageSerializable, SKeyValue}
-import com.kakao.s2graph.core.types.{HBaseType, VertexId}
-import com.kakao.s2graph.core.utils.logger
-import com.kakao.s2graph.core.{GraphUtil, IndexEdge}
-import org.apache.hadoop.hbase.util.Bytes
-
-case class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] {
-
- import StorageSerializable._
-
- val label = indexEdge.label
- val table = label.hbaseTableName.getBytes()
- val cf = Serializable.edgeCf
-
- val idxPropsMap = indexEdge.orders.toMap
- val idxPropsBytes = propsToBytes(indexEdge.orders)
-
- /** version 1 and version 2 share same code for serialize row key part */
- override def toKeyValues: Seq[SKeyValue] = {
- toKeyValuesInner
- }
- def toKeyValuesInner: Seq[SKeyValue] = {
- val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
- val labelWithDirBytes = indexEdge.labelWithDir.bytes
- val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
-
- val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
- // logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}")
- val tgtIdBytes = VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes
- val qualifier =
- if (indexEdge.degreeEdge) Array.empty[Byte]
- else {
- if (indexEdge.op == GraphUtil.operations("incrementCount")) {
- Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op))
- } else {
- idxPropsMap.get(LabelMeta.toSeq) match {
- case None => Bytes.add(idxPropsBytes, tgtIdBytes)
- case Some(vId) => idxPropsBytes
- }
- }
- }
-
-
- val value =
- if (indexEdge.degreeEdge)
- Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong)
- else if (indexEdge.op == GraphUtil.operations("incrementCount"))
- Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong)
- else propsToKeyValues(indexEdge.metas.toSeq)
-
- val kv = SKeyValue(table, row, cf, qualifier, value, indexEdge.version)
-
- Seq(kv)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeDeserializable.scala
deleted file mode 100644
index d4f55f0..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeDeserializable.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-package com.kakao.s2graph.core.storage
-
-import com.kakao.s2graph.core.mysqls.{LabelIndex, LabelMeta}
-import com.kakao.s2graph.core.storage.{CanSKeyValue, SKeyValue, StorageDeserializable}
-import com.kakao.s2graph.core.types._
-import com.kakao.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex}
-import org.apache.hadoop.hbase.util.Bytes
-
-class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
-
- import StorageDeserializable._
-
- override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
- _kvs: Seq[T],
- version: String,
- cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = {
- queryParam.label.schemaVersion match {
- case HBaseType.VERSION2 | HBaseType.VERSION1 => fromKeyValuesInnerOld(queryParam, _kvs, version, cacheElementOpt)
- case _ => fromKeyValuesInnerV3(queryParam, _kvs, version, cacheElementOpt)
- }
- }
-
- def statusCodeWithOp(byte: Byte): (Byte, Byte) = {
- val statusCode = byte >> 4
- val op = byte & ((1 << 4) - 1)
- (statusCode.toByte, op.toByte)
- }
-
- def fromKeyValuesInnerOld[T: CanSKeyValue](queryParam: QueryParam, _kvs: Seq[T], version: String, cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = {
- val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
- assert(kvs.size == 1)
-
- val kv = kvs.head
- val schemaVer = queryParam.label.schemaVersion
- val cellVersion = kv.timestamp
-
- val (srcVertexId, labelWithDir, _, _, _) = cacheElementOpt.map { e =>
- (e.srcVertex.id, e.labelWithDir, LabelIndex.DefaultSeq, true, 0)
- }.getOrElse(parseRow(kv, schemaVer))
-
- val (tgtVertexId, props, op, ts, statusCode, _pendingEdgeOpt) = {
- val (tgtVertexId, _) = TargetVertexId.fromBytes(kv.qualifier, 0, kv.qualifier.length, schemaVer)
- var pos = 0
- val (statusCode, op) = statusCodeWithOp(kv.value(pos))
- pos += 1
- val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
- val kvsMap = props.toMap
- val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong
-
- pos = endAt
- val _pendingEdgeOpt =
- if (pos == kv.value.length) None
- else {
- val (pendingEdgeStatusCode, pendingEdgeOp) = statusCodeWithOp(kv.value(pos))
- pos += 1
- // val versionNum = Bytes.toLong(kv.value, pos, 8)
- // pos += 8
- val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
- pos = endAt
- val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
-
- val pendingEdge =
- Edge(Vertex(srcVertexId, cellVersion),
- Vertex(tgtVertexId, cellVersion),
- labelWithDir, pendingEdgeOp,
- cellVersion, pendingEdgeProps.toMap,
- statusCode = pendingEdgeStatusCode, lockTs = lockTs)
- Option(pendingEdge)
- }
-
- (tgtVertexId, kvsMap, op, ts, statusCode, _pendingEdgeOpt)
- }
-
- SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts),
- labelWithDir, op, cellVersion, props, statusCode = statusCode,
- pendingEdgeOpt = _pendingEdgeOpt, lockTs = None)
- }
-
- private def fromKeyValuesInnerV3[T: CanSKeyValue](queryParam: QueryParam, _kvs: Seq[T], version: String, cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = {
- val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
- assert(kvs.size == 1)
-
- val kv = kvs.head
- val schemaVer = queryParam.label.schemaVersion
- val cellVersion = kv.timestamp
- /** rowKey */
- def parseRowV3(kv: SKeyValue, version: String) = {
- var pos = 0
- val (srcIdAndTgtId, srcIdAndTgtIdLen) = SourceAndTargetVertexIdPair.fromBytes(kv.row, pos, kv.row.length, version)
- pos += srcIdAndTgtIdLen
- val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
- pos += 4
- val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
-
- val rowLen = srcIdAndTgtIdLen + 4 + 1
- (srcIdAndTgtId.srcInnerId, srcIdAndTgtId.tgtInnerId, labelWithDir, labelIdxSeq, isInverted, rowLen)
-
- }
- val (srcInnerId, tgtInnerId, labelWithDir, _, _, _) = cacheElementOpt.map { e =>
- (e.srcVertex.innerId, e.tgtVertex.innerId, e.labelWithDir, LabelIndex.DefaultSeq, true, 0)
- }.getOrElse(parseRowV3(kv, schemaVer))
-
- val srcVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, srcInnerId)
- val tgtVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, tgtInnerId)
-
- val (props, op, ts, statusCode, _pendingEdgeOpt) = {
- var pos = 0
- val (statusCode, op) = statusCodeWithOp(kv.value(pos))
- pos += 1
- val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
- val kvsMap = props.toMap
- val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong
-
- pos = endAt
- val _pendingEdgeOpt =
- if (pos == kv.value.length) None
- else {
- val (pendingEdgeStatusCode, pendingEdgeOp) = statusCodeWithOp(kv.value(pos))
- pos += 1
- // val versionNum = Bytes.toLong(kv.value, pos, 8)
- // pos += 8
- val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
- pos = endAt
- val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
-
- val pendingEdge =
- Edge(Vertex(srcVertexId, cellVersion),
- Vertex(tgtVertexId, cellVersion),
- labelWithDir, pendingEdgeOp,
- cellVersion, pendingEdgeProps.toMap,
- statusCode = pendingEdgeStatusCode, lockTs = lockTs)
- Option(pendingEdge)
- }
-
- (kvsMap, op, ts, statusCode, _pendingEdgeOpt)
- }
-
- SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts),
- labelWithDir, op, cellVersion, props, statusCode = statusCode,
- pendingEdgeOpt = _pendingEdgeOpt, lockTs = None)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeSerializable.scala
deleted file mode 100644
index 9e6e1b7..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeSerializable.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-package com.kakao.s2graph.core.storage
-
-import com.kakao.s2graph.core.SnapshotEdge
-import com.kakao.s2graph.core.mysqls.LabelIndex
-import com.kakao.s2graph.core.types.{HBaseType, SourceAndTargetVertexIdPair, VertexId}
-import org.apache.hadoop.hbase.util.Bytes
-
-
-class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] {
- import StorageSerializable._
-
- val label = snapshotEdge.label
- val table = label.hbaseTableName.getBytes()
- val cf = Serializable.edgeCf
-
- def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = {
- val byte = (((statusCode << 4) | op).toByte)
- Array.fill(1)(byte.toByte)
- }
- def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op),
- propsToKeyValuesWithTs(snapshotEdge.props.toList))
-
- override def toKeyValues: Seq[SKeyValue] = {
- label.schemaVersion match {
- case HBaseType.VERSION1 | HBaseType.VERSION2 => toKeyValuesInner
- case _ => toKeyValuesInnerV3
- }
- }
-
- private def toKeyValuesInner: Seq[SKeyValue] = {
- val srcIdBytes = VertexId.toSourceVertexId(snapshotEdge.srcVertex.id).bytes
- val labelWithDirBytes = snapshotEdge.labelWithDir.bytes
- val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true)
-
- val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
- val tgtIdBytes = VertexId.toTargetVertexId(snapshotEdge.tgtVertex.id).bytes
-
- val qualifier = tgtIdBytes
-
- val value = snapshotEdge.pendingEdgeOpt match {
- case None => valueBytes()
- case Some(pendingEdge) =>
- val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op)
- val versionBytes = Array.empty[Byte]
- val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq)
- val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get)
- Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes))
- }
- val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version)
- Seq(kv)
- }
-
- private def toKeyValuesInnerV3: Seq[SKeyValue] = {
- val srcIdAndTgtIdBytes = SourceAndTargetVertexIdPair(snapshotEdge.srcVertex.innerId, snapshotEdge.tgtVertex.innerId).bytes
- val labelWithDirBytes = snapshotEdge.labelWithDir.bytes
- val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true)
-
- val row = Bytes.add(srcIdAndTgtIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
-
- val qualifier = Array.empty[Byte]
-
- val value = snapshotEdge.pendingEdgeOpt match {
- case None => valueBytes()
- case Some(pendingEdge) =>
- val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op)
- val versionBytes = Array.empty[Byte]
- val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq)
- val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get)
-
- Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes))
- }
-
- val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version)
- Seq(kv)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
index bca8df3..cc8f13c 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
@@ -4,6 +4,10 @@ import com.kakao.s2graph.core.ExceptionHandler.{Key, Val, KafkaMessage}
import com.kakao.s2graph.core.GraphExceptions.FetchTimeoutException
import com.kakao.s2graph.core._
import com.kakao.s2graph.core.mysqls._
+import com.kakao.s2graph.core.storage.serde._
+import com.kakao.s2graph.core.storage.serde.snapshotedge.tall
+import com.kakao.s2graph.core.storage.serde.snapshotedge.wide.SnapshotEdgeDeserializable
+import com.kakao.s2graph.core.storage.serde.vertex._
import com.kakao.s2graph.core.types._
import com.kakao.s2graph.core.utils.{Extensions, logger}
import com.typesafe.config.Config
@@ -16,6 +20,8 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Random, Try}
abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
+ import HBaseType._
+
/** storage dependent configurations */
val MaxRetryNum = config.getInt("max.retry.number")
val MaxBackOff = config.getInt("max.back.off")
@@ -26,6 +32,14 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
val expireAfterWrite = config.getInt("future.cache.expire.after.write")
val expireAfterAccess = config.getInt("future.cache.expire.after.access")
+ /**
+ * Compatibility table
+ * | label schema version | snapshot edge | index edge | vertex | note |
+ * | v1 | serde.snapshotedge.wide | serde.indexedge.wide | serde.vertex | do not use this. this exist only for backward compatibility issue |
+ * | v2 | serde.snapshotedge.wide | serde.indexedge.wide | serde.vertex | do not use this. this exist only for backward compatibility issue |
+ * | v3 | serde.snapshotedge.tall | serde.indexedge.wide | serde.vertex | recommended with HBase. current stable schema |
+ *
+ */
/**
* create serializer that knows how to convert given snapshotEdge into kvs: Seq[SKeyValue]
@@ -33,14 +47,26 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
* @param snapshotEdge: snapshotEdge to serialize
* @return serializer implementation for StorageSerializable which has toKeyValues return Seq[SKeyValue]
*/
- def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge) = new SnapshotEdgeSerializable(snapshotEdge)
+ def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge) = {
+ snapshotEdge.schemaVer match {
+ case VERSION1 | VERSION2 => new serde.snapshotedge.wide.SnapshotEdgeSerializable(snapshotEdge)
+ case VERSION3 => new serde.snapshotedge.tall.SnapshotEdgeSerializable(snapshotEdge)
+ case _ => throw new RuntimeException(s"not supported version: ${snapshotEdge.schemaVer}")
+ }
+ }
/**
* create serializer that knows how to convert given indexEdge into kvs: Seq[SKeyValue]
- * @param indexedEdge: indexEdge to serialize
+ * @param indexEdge: indexEdge to serialize
* @return serializer implementation
*/
- def indexEdgeSerializer(indexedEdge: IndexEdge) = new IndexEdgeSerializable(indexedEdge)
+ def indexEdgeSerializer(indexEdge: IndexEdge) = {
+ indexEdge.schemaVer match {
+ case VERSION1 | VERSION2 | VERSION3 => new indexedge.wide.IndexEdgeSerializable(indexEdge)
+ case _ => throw new RuntimeException(s"not supported version: ${indexEdge.schemaVer}")
+
+ }
+ }
/**
* create serializer that knows how to convert given vertex into kvs: Seq[SKeyValue]
@@ -58,10 +84,24 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
* if any storaage use different class to represent stored byte array,
* then that storage implementation is responsible to provide implicit type conversion method on CanSKeyValue.
* */
- val snapshotEdgeDeserializer = new SnapshotEdgeDeserializable
+
+ val snapshotEdgeDeserializers = Map(
+ VERSION1 -> new snapshotedge.wide.SnapshotEdgeDeserializable,
+ VERSION2 -> new snapshotedge.wide.SnapshotEdgeDeserializable,
+ VERSION3 -> new tall.SnapshotEdgeDeserializable
+ )
+ def snapshotEdgeDeserializer(schemaVer: String) =
+ snapshotEdgeDeserializers.get(schemaVer).getOrElse(throw new RuntimeException(s"not supported version: ${schemaVer}"))
/** create deserializer that can parse stored CanSKeyValue into indexEdge. */
- val indexEdgeDeserializer = new IndexEdgeDeserializable
+ val indexEdgeDeserializers = Map(
+ VERSION1 -> new indexedge.wide.IndexEdgeDeserializable,
+ VERSION2 -> new indexedge.wide.IndexEdgeDeserializable,
+ VERSION3 -> new indexedge.wide.IndexEdgeDeserializable
+ )
+
+ def indexEdgeDeserializer(schemaVer: String) =
+ indexEdgeDeserializers.get(schemaVer).getOrElse(throw new RuntimeException(s"not supported version: ${schemaVer}"))
/** create deserializer that can parser stored CanSKeyValue into vertex. */
val vertexDeserializer = new VertexDeserializable
@@ -587,7 +627,8 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
parentEdges: Seq[EdgeWithScore]): Option[Edge] = {
// logger.debug(s"toEdge: $kv")
try {
- val indexEdgeOpt = indexEdgeDeserializer.fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, cacheElementOpt)
+ val schemaVer = queryParam.label.schemaVersion
+ val indexEdgeOpt = indexEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, cacheElementOpt)
indexEdgeOpt.map(indexEdge => indexEdge.toEdge.copy(parentEdges = parentEdges))
} catch {
case ex: Exception =>
@@ -602,7 +643,8 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
isInnerCall: Boolean,
parentEdges: Seq[EdgeWithScore]): Option[Edge] = {
// logger.debug(s"SnapshottoEdge: $kv")
- val snapshotEdgeOpt = snapshotEdgeDeserializer.fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, cacheElementOpt)
+ val schemaVer = queryParam.label.schemaVersion
+ val snapshotEdgeOpt = snapshotEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, cacheElementOpt)
if (isInnerCall) {
snapshotEdgeOpt.flatMap { snapshotEdge =>
@@ -631,9 +673,10 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
else {
val first = kvs.head
val kv = first
+ val schemaVer = queryParam.label.schemaVersion
val cacheElementOpt =
if (queryParam.isSnapshotEdge) None
- else indexEdgeDeserializer.fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, None)
+ else indexEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, None)
for {
kv <- kvs
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexDeserializable.scala
deleted file mode 100644
index 699981d..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexDeserializable.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-package com.kakao.s2graph.core.storage
-
-import com.kakao.s2graph.core.types.{InnerVal, InnerValLike, VertexId}
-import com.kakao.s2graph.core.{QueryParam, Vertex}
-import org.apache.hadoop.hbase.util.Bytes
-
-import scala.collection.mutable.ListBuffer
-
-class VertexDeserializable extends Deserializable[Vertex] {
- def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
- _kvs: Seq[T],
- version: String,
- cacheElementOpt: Option[Vertex]): Vertex = {
-
- val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-
- val kv = kvs.head
- val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version)
-
- var maxTs = Long.MinValue
- val propsMap = new collection.mutable.HashMap[Int, InnerValLike]
- val belongLabelIds = new ListBuffer[Int]
-
- for {
- kv <- kvs
- } {
- val propKey =
- if (kv.qualifier.length == 1) kv.qualifier.head.toInt
- else Bytes.toInt(kv.qualifier)
-
- val ts = kv.timestamp
- if (ts > maxTs) maxTs = ts
-
- if (Vertex.isLabelId(propKey)) {
- belongLabelIds += Vertex.toLabelId(propKey)
- } else {
- val v = kv.value
- val (value, _) = InnerVal.fromBytes(v, 0, v.length, version)
- propsMap += (propKey -> value)
- }
- }
- assert(maxTs != Long.MinValue)
- Vertex(vertexId, maxTs, propsMap.toMap, belongLabelIds = belongLabelIds)
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexSerializable.scala
deleted file mode 100644
index bda909d..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexSerializable.scala
+++ /dev/null
@@ -1,18 +0,0 @@
-package com.kakao.s2graph.core.storage
-
-import com.kakao.s2graph.core.Vertex
-import org.apache.hadoop.hbase.util.Bytes
-
-case class VertexSerializable(vertex: Vertex) extends Serializable[Vertex] {
-
- val cf = Serializable.vertexCf
-
- override def toKeyValues: Seq[SKeyValue] = {
- val row = vertex.id.bytes
- val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield Bytes.toBytes(k) -> v.bytes
- val belongsTo = vertex.belongLabelIds.map { labelId => Bytes.toBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] }
- (base ++ belongsTo).map { case (qualifier, value) =>
- SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, vertex.ts)
- } toSeq
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
new file mode 100644
index 0000000..014a5c9
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -0,0 +1,132 @@
+package com.kakao.s2graph.core.storage.serde.indexedge.tall
+
+import com.kakao.s2graph.core.mysqls.LabelMeta
+import com.kakao.s2graph.core.storage.StorageDeserializable._
+import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, StorageDeserializable}
+import com.kakao.s2graph.core.types._
+import com.kakao.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex}
+import org.apache.hadoop.hbase.util.Bytes
+
+class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] {
+ import StorageDeserializable._
+
+ type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int)
+ type ValueRaw = (Array[(Byte, InnerValLike)], Int)
+
+ private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = {
+ // val degree = Bytes.toLong(kv.value)
+ val degree = bytesToLongFunc(kv.value, 0)
+ val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version))
+ val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version))
+ (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
+ }
+
+ private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = {
+ var qualifierLen = 0
+ var pos = 0
+ val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = {
+ val (props, endAt) = bytesToProps(kv.qualifier, pos, version)
+ pos = endAt
+ qualifierLen += endAt
+ val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) {
+ (HBaseType.defaultTgtVertexId, 0)
+ } else {
+ TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version)
+ }
+ qualifierLen += tgtVertexIdLen
+ (props, endAt, tgtVertexId, tgtVertexIdLen)
+ }
+ val (op, opLen) =
+ if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0)
+ else (kv.qualifier(qualifierLen), 1)
+
+ qualifierLen += opLen
+
+ (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen)
+ }
+
+ private def parseValue(kv: SKeyValue, version: String): ValueRaw = {
+ val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version)
+ (props, endAt)
+ }
+
+ private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = {
+ (Array.empty[(Byte, InnerValLike)], 0)
+ }
+
+ override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
+ _kvs: Seq[T],
+ version: String,
+ cacheElementOpt: Option[IndexEdge]): IndexEdge = {
+
+ assert(_kvs.size == 1)
+
+ val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+
+ val kv = kvs.head
+
+ // logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}")
+ var pos = 0
+ val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, version)
+ pos += srcIdLen
+ val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
+ pos += 4
+ val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
+ pos += 1
+
+ val op = kv.row(pos)
+ pos += 1
+
+ if (pos == kv.row.length) {
+ // degree
+ // val degreeVal = Bytes.toLong(kv.value)
+ val degreeVal = bytesToLongFunc(kv.value, 0)
+ val ts = kv.timestamp
+ val props = Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, version),
+ LabelMeta.degreeSeq -> InnerVal.withLong(degreeVal, version))
+ val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version))
+ IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props)
+ } else {
+ // not degree edge
+ val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
+
+ val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, version)
+ pos = endAt
+ val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length) {
+ (HBaseType.defaultTgtVertexId, 0)
+ } else {
+ TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, version)
+ }
+
+ val idxProps = for {
+ (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
+ } yield if (k == LabelMeta.degreeSeq) k -> v else seq -> v
+
+ val idxPropsMap = idxProps.toMap
+
+ val tgtVertexId =
+ idxPropsMap.get(LabelMeta.toSeq) match {
+ case None => tgtVertexIdRaw
+ case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId)
+ }
+
+ val (props, _) = if (op == GraphUtil.operations("incrementCount")) {
+ // val countVal = Bytes.toLong(kv.value)
+ val countVal = bytesToLongFunc(kv.value, 0)
+ val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version))
+ (dummyProps, 8)
+ } else {
+ bytesToKeyValues(kv.value, 0, kv.value.length, version)
+ }
+
+ val _mergedProps = (idxProps ++ props).toMap
+ val mergedProps =
+ if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
+ else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version))
+
+ val ts = kv.timestamp
+ IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps)
+
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
new file mode 100644
index 0000000..46ad15f
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
@@ -0,0 +1,53 @@
+package com.kakao.s2graph.core.storage.serde.indexedge.tall
+
+import com.kakao.s2graph.core.mysqls.LabelMeta
+import com.kakao.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable}
+import com.kakao.s2graph.core.types.VertexId
+import com.kakao.s2graph.core.{GraphUtil, IndexEdge}
+import org.apache.hadoop.hbase.util.Bytes
+
+
+class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] {
+ import StorageSerializable._
+
+ val label = indexEdge.label
+ val table = label.hbaseTableName.getBytes()
+ val cf = Serializable.edgeCf
+
+ val idxPropsMap = indexEdge.orders.toMap
+ val idxPropsBytes = propsToBytes(indexEdge.orders)
+
+ override def toKeyValues: Seq[SKeyValue] = {
+ val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
+ val labelWithDirBytes = indexEdge.labelWithDir.bytes
+ val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
+
+ val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
+ // logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}")
+
+ val qualifier =
+ if (indexEdge.degreeEdge) Array.empty[Byte]
+ else
+ idxPropsMap.get(LabelMeta.toSeq) match {
+ case None => Bytes.add(idxPropsBytes, VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes)
+ case Some(vId) => idxPropsBytes
+ }
+
+ /** TODO search usage of op byte. if there is no, then remove opByte */
+ val rowBytes = Bytes.add(row, Array.fill(1)(GraphUtil.defaultOpByte), qualifier)
+ // val qualifierBytes = Array.fill(1)(indexEdge.op)
+ val qualifierBytes = Array.empty[Byte]
+
+ val value =
+ if (indexEdge.degreeEdge)
+ Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong)
+ else if (indexEdge.op == GraphUtil.operations("incrementCount"))
+ Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong)
+ else propsToKeyValues(indexEdge.metas.toSeq)
+
+ val kv = SKeyValue(table, rowBytes, cf, qualifierBytes, value, indexEdge.version)
+
+ // logger.debug(s"[Ser]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}")
+ Seq(kv)
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
new file mode 100644
index 0000000..f83dd1f
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -0,0 +1,116 @@
+package com.kakao.s2graph.core.storage.serde.indexedge.wide
+
+import com.kakao.s2graph.core.mysqls.LabelMeta
+import com.kakao.s2graph.core.storage.StorageDeserializable._
+import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, StorageDeserializable}
+import com.kakao.s2graph.core.types._
+import com.kakao.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex}
+
+class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] {
+ import StorageDeserializable._
+
+ type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int)
+ type ValueRaw = (Array[(Byte, InnerValLike)], Int)
+
+ private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = {
+ // val degree = Bytes.toLong(kv.value)
+ val degree = bytesToLongFunc(kv.value, 0)
+ val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version))
+ val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version))
+ (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
+ }
+
+ private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = {
+ var qualifierLen = 0
+ var pos = 0
+ val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = {
+ val (props, endAt) = bytesToProps(kv.qualifier, pos, version)
+ pos = endAt
+ qualifierLen += endAt
+ val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) {
+ (HBaseType.defaultTgtVertexId, 0)
+ } else {
+ TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version)
+ }
+ qualifierLen += tgtVertexIdLen
+ (props, endAt, tgtVertexId, tgtVertexIdLen)
+ }
+ val (op, opLen) =
+ if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0)
+ else (kv.qualifier(qualifierLen), 1)
+
+ qualifierLen += opLen
+
+ (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen)
+ }
+
+ private def parseValue(kv: SKeyValue, version: String): ValueRaw = {
+ val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version)
+ (props, endAt)
+ }
+
+ private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = {
+ (Array.empty[(Byte, InnerValLike)], 0)
+ }
+
+ override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
+ _kvs: Seq[T],
+ version: String,
+ cacheElementOpt: Option[IndexEdge]): IndexEdge = {
+ assert(_kvs.size == 1)
+
+ val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+
+ val kv = kvs.head
+ val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e =>
+ (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0)
+ }.getOrElse(parseRow(kv, version))
+
+ val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) =
+ if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, version)
+ else parseQualifier(kv, version)
+
+ val (props, _) = if (op == GraphUtil.operations("incrementCount")) {
+ // val countVal = Bytes.toLong(kv.value)
+ val countVal = bytesToLongFunc(kv.value, 0)
+ val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version))
+ (dummyProps, 8)
+ } else if (kv.qualifier.isEmpty) {
+ parseDegreeValue(kv, version)
+ } else {
+ parseValue(kv, version)
+ }
+
+ val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
+
+
+ // assert(kv.qualifier.nonEmpty && index.metaSeqs.size == idxPropsRaw.size)
+
+ val idxProps = for {
+ (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
+ } yield {
+ if (k == LabelMeta.degreeSeq) k -> v
+ else seq -> v
+ }
+
+ val idxPropsMap = idxProps.toMap
+ val tgtVertexId = if (tgtVertexIdInQualifier) {
+ idxPropsMap.get(LabelMeta.toSeq) match {
+ case None => tgtVertexIdRaw
+ case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId)
+ }
+ } else tgtVertexIdRaw
+
+ val _mergedProps = (idxProps ++ props).toMap
+ val mergedProps =
+ if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
+ else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version))
+
+ // logger.error(s"$mergedProps")
+ // val ts = mergedProps(LabelMeta.timeStampSeq).toString().toLong
+
+ val ts = kv.timestamp
+ IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps)
+
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
new file mode 100644
index 0000000..716b6fb
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
@@ -0,0 +1,53 @@
+package com.kakao.s2graph.core.storage.serde.indexedge.wide
+
+import com.kakao.s2graph.core.mysqls.LabelMeta
+import com.kakao.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable}
+import com.kakao.s2graph.core.types.VertexId
+import com.kakao.s2graph.core.{GraphUtil, IndexEdge}
+import org.apache.hadoop.hbase.util.Bytes
+
+
+class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] {
+ import StorageSerializable._
+
+ val label = indexEdge.label
+ val table = label.hbaseTableName.getBytes()
+ val cf = Serializable.edgeCf
+
+ val idxPropsMap = indexEdge.orders.toMap
+ val idxPropsBytes = propsToBytes(indexEdge.orders)
+
+ override def toKeyValues: Seq[SKeyValue] = {
+ val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
+ val labelWithDirBytes = indexEdge.labelWithDir.bytes
+ val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
+
+ val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
+ // logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}")
+ val tgtIdBytes = VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes
+ val qualifier =
+ if (indexEdge.degreeEdge) Array.empty[Byte]
+ else {
+ if (indexEdge.op == GraphUtil.operations("incrementCount")) {
+ Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op))
+ } else {
+ idxPropsMap.get(LabelMeta.toSeq) match {
+ case None => Bytes.add(idxPropsBytes, tgtIdBytes)
+ case Some(vId) => idxPropsBytes
+ }
+ }
+ }
+
+
+ val value =
+ if (indexEdge.degreeEdge)
+ Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong)
+ else if (indexEdge.op == GraphUtil.operations("incrementCount"))
+ Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong)
+ else propsToKeyValues(indexEdge.metas.toSeq)
+
+ val kv = SKeyValue(table, row, cf, qualifier, value, indexEdge.version)
+
+ Seq(kv)
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
new file mode 100644
index 0000000..c97bed6
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
@@ -0,0 +1,84 @@
+package com.kakao.s2graph.core.storage.serde.snapshotedge.tall
+
+import com.kakao.s2graph.core.mysqls.{LabelIndex, LabelMeta}
+import com.kakao.s2graph.core.storage.StorageDeserializable._
+import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue}
+import com.kakao.s2graph.core.types.{HBaseType, LabelWithDirection, SourceAndTargetVertexIdPair, SourceVertexId}
+import com.kakao.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex}
+import org.apache.hadoop.hbase.util.Bytes
+
+class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
+
+ def statusCodeWithOp(byte: Byte): (Byte, Byte) = {
+ val statusCode = byte >> 4
+ val op = byte & ((1 << 4) - 1)
+ (statusCode.toByte, op.toByte)
+ }
+
+ override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
+ _kvs: Seq[T],
+ version: String,
+ cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = {
+ val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+ assert(kvs.size == 1)
+
+ val kv = kvs.head
+ val schemaVer = queryParam.label.schemaVersion
+ val cellVersion = kv.timestamp
+ /** rowKey */
+ def parseRowV3(kv: SKeyValue, version: String) = {
+ var pos = 0
+ val (srcIdAndTgtId, srcIdAndTgtIdLen) = SourceAndTargetVertexIdPair.fromBytes(kv.row, pos, kv.row.length, version)
+ pos += srcIdAndTgtIdLen
+ val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
+ pos += 4
+ val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
+
+ val rowLen = srcIdAndTgtIdLen + 4 + 1
+ (srcIdAndTgtId.srcInnerId, srcIdAndTgtId.tgtInnerId, labelWithDir, labelIdxSeq, isInverted, rowLen)
+
+ }
+ val (srcInnerId, tgtInnerId, labelWithDir, _, _, _) = cacheElementOpt.map { e =>
+ (e.srcVertex.innerId, e.tgtVertex.innerId, e.labelWithDir, LabelIndex.DefaultSeq, true, 0)
+ }.getOrElse(parseRowV3(kv, schemaVer))
+
+ val srcVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, srcInnerId)
+ val tgtVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, tgtInnerId)
+
+ val (props, op, ts, statusCode, _pendingEdgeOpt) = {
+ var pos = 0
+ val (statusCode, op) = statusCodeWithOp(kv.value(pos))
+ pos += 1
+ val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
+ val kvsMap = props.toMap
+ val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong
+
+ pos = endAt
+ val _pendingEdgeOpt =
+ if (pos == kv.value.length) None
+ else {
+ val (pendingEdgeStatusCode, pendingEdgeOp) = statusCodeWithOp(kv.value(pos))
+ pos += 1
+ // val versionNum = Bytes.toLong(kv.value, pos, 8)
+ // pos += 8
+ val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
+ pos = endAt
+ val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
+
+ val pendingEdge =
+ Edge(Vertex(srcVertexId, cellVersion),
+ Vertex(tgtVertexId, cellVersion),
+ labelWithDir, pendingEdgeOp,
+ cellVersion, pendingEdgeProps.toMap,
+ statusCode = pendingEdgeStatusCode, lockTs = lockTs)
+ Option(pendingEdge)
+ }
+
+ (kvsMap, op, ts, statusCode, _pendingEdgeOpt)
+ }
+
+ SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts),
+ labelWithDir, op, cellVersion, props, statusCode = statusCode,
+ pendingEdgeOpt = _pendingEdgeOpt, lockTs = None)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
new file mode 100644
index 0000000..a507b90
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
@@ -0,0 +1,47 @@
+package com.kakao.s2graph.core.storage.serde.snapshotedge.tall
+
+import com.kakao.s2graph.core.SnapshotEdge
+import com.kakao.s2graph.core.mysqls.LabelIndex
+import com.kakao.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable}
+import com.kakao.s2graph.core.types.SourceAndTargetVertexIdPair
+import org.apache.hadoop.hbase.util.Bytes
+
+
+class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] {
+ import StorageSerializable._
+
+ val label = snapshotEdge.label
+ val table = label.hbaseTableName.getBytes()
+ val cf = Serializable.edgeCf
+
+ def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = {
+ val byte = (((statusCode << 4) | op).toByte)
+ Array.fill(1)(byte.toByte)
+ }
+ def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op),
+ propsToKeyValuesWithTs(snapshotEdge.props.toList))
+
+ override def toKeyValues: Seq[SKeyValue] = {
+ val srcIdAndTgtIdBytes = SourceAndTargetVertexIdPair(snapshotEdge.srcVertex.innerId, snapshotEdge.tgtVertex.innerId).bytes
+ val labelWithDirBytes = snapshotEdge.labelWithDir.bytes
+ val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true)
+
+ val row = Bytes.add(srcIdAndTgtIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
+
+ val qualifier = Array.empty[Byte]
+
+ val value = snapshotEdge.pendingEdgeOpt match {
+ case None => valueBytes()
+ case Some(pendingEdge) =>
+ val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op)
+ val versionBytes = Array.empty[Byte]
+ val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq)
+ val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get)
+
+ Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes))
+ }
+
+ val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version)
+ Seq(kv)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
new file mode 100644
index 0000000..1174f50
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
@@ -0,0 +1,70 @@
+package com.kakao.s2graph.core.storage.serde.snapshotedge.wide
+
+import com.kakao.s2graph.core.mysqls.{LabelIndex, LabelMeta}
+import com.kakao.s2graph.core.storage.StorageDeserializable._
+import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable}
+import com.kakao.s2graph.core.types.TargetVertexId
+import com.kakao.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex}
+import org.apache.hadoop.hbase.util.Bytes
+
+class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
+
+ def statusCodeWithOp(byte: Byte): (Byte, Byte) = {
+ val statusCode = byte >> 4
+ val op = byte & ((1 << 4) - 1)
+ (statusCode.toByte, op.toByte)
+ }
+
+ override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
+ _kvs: Seq[T],
+ version: String,
+ cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = {
+ val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+ assert(kvs.size == 1)
+
+ val kv = kvs.head
+ val schemaVer = queryParam.label.schemaVersion
+ val cellVersion = kv.timestamp
+
+ val (srcVertexId, labelWithDir, _, _, _) = cacheElementOpt.map { e =>
+ (e.srcVertex.id, e.labelWithDir, LabelIndex.DefaultSeq, true, 0)
+ }.getOrElse(parseRow(kv, schemaVer))
+
+ val (tgtVertexId, props, op, ts, statusCode, _pendingEdgeOpt) = {
+ val (tgtVertexId, _) = TargetVertexId.fromBytes(kv.qualifier, 0, kv.qualifier.length, schemaVer)
+ var pos = 0
+ val (statusCode, op) = statusCodeWithOp(kv.value(pos))
+ pos += 1
+ val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
+ val kvsMap = props.toMap
+ val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong
+
+ pos = endAt
+ val _pendingEdgeOpt =
+ if (pos == kv.value.length) None
+ else {
+ val (pendingEdgeStatusCode, pendingEdgeOp) = statusCodeWithOp(kv.value(pos))
+ pos += 1
+ // val versionNum = Bytes.toLong(kv.value, pos, 8)
+ // pos += 8
+ val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
+ pos = endAt
+ val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
+
+ val pendingEdge =
+ Edge(Vertex(srcVertexId, cellVersion),
+ Vertex(tgtVertexId, cellVersion),
+ labelWithDir, pendingEdgeOp,
+ cellVersion, pendingEdgeProps.toMap,
+ statusCode = pendingEdgeStatusCode, lockTs = lockTs)
+ Option(pendingEdge)
+ }
+
+ (tgtVertexId, kvsMap, op, ts, statusCode, _pendingEdgeOpt)
+ }
+
+ SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts),
+ labelWithDir, op, cellVersion, props, statusCode = statusCode,
+ pendingEdgeOpt = _pendingEdgeOpt, lockTs = None)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
new file mode 100644
index 0000000..e6074d9
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
@@ -0,0 +1,50 @@
+package com.kakao.s2graph.core.storage.serde.snapshotedge.wide
+
+import com.kakao.s2graph.core.SnapshotEdge
+import com.kakao.s2graph.core.mysqls.LabelIndex
+import com.kakao.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable}
+import com.kakao.s2graph.core.types.VertexId
+import org.apache.hadoop.hbase.util.Bytes
+
+
+/**
+ * this class serialize
+ * @param snapshotEdge
+ */
+class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] {
+ import StorageSerializable._
+
+ val label = snapshotEdge.label
+ val table = label.hbaseTableName.getBytes()
+ val cf = Serializable.edgeCf
+
+ def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = {
+ val byte = (((statusCode << 4) | op).toByte)
+ Array.fill(1)(byte.toByte)
+ }
+ def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op),
+ propsToKeyValuesWithTs(snapshotEdge.props.toList))
+
+ override def toKeyValues: Seq[SKeyValue] = {
+ val srcIdBytes = VertexId.toSourceVertexId(snapshotEdge.srcVertex.id).bytes
+ val labelWithDirBytes = snapshotEdge.labelWithDir.bytes
+ val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true)
+
+ val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
+ val tgtIdBytes = VertexId.toTargetVertexId(snapshotEdge.tgtVertex.id).bytes
+
+ val qualifier = tgtIdBytes
+
+ val value = snapshotEdge.pendingEdgeOpt match {
+ case None => valueBytes()
+ case Some(pendingEdge) =>
+ val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op)
+ val versionBytes = Array.empty[Byte]
+ val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq)
+ val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get)
+ Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes))
+ }
+ val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version)
+ Seq(kv)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
new file mode 100644
index 0000000..e355401
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
@@ -0,0 +1,46 @@
+package com.kakao.s2graph.core.storage.serde.vertex
+
+import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable}
+import com.kakao.s2graph.core.types.{InnerVal, InnerValLike, VertexId}
+import com.kakao.s2graph.core.{QueryParam, Vertex}
+import org.apache.hadoop.hbase.util.Bytes
+import scala.collection.mutable.ListBuffer
+
+class VertexDeserializable extends Deserializable[Vertex] {
+ def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
+ _kvs: Seq[T],
+ version: String,
+ cacheElementOpt: Option[Vertex]): Vertex = {
+
+ val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+
+ val kv = kvs.head
+ val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version)
+
+ var maxTs = Long.MinValue
+ val propsMap = new collection.mutable.HashMap[Int, InnerValLike]
+ val belongLabelIds = new ListBuffer[Int]
+
+ for {
+ kv <- kvs
+ } {
+ val propKey =
+ if (kv.qualifier.length == 1) kv.qualifier.head.toInt
+ else Bytes.toInt(kv.qualifier)
+
+ val ts = kv.timestamp
+ if (ts > maxTs) maxTs = ts
+
+ if (Vertex.isLabelId(propKey)) {
+ belongLabelIds += Vertex.toLabelId(propKey)
+ } else {
+ val v = kv.value
+ val (value, _) = InnerVal.fromBytes(v, 0, v.length, version)
+ propsMap += (propKey -> value)
+ }
+ }
+ assert(maxTs != Long.MinValue)
+ Vertex(vertexId, maxTs, propsMap.toMap, belongLabelIds = belongLabelIds)
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexSerializable.scala
new file mode 100644
index 0000000..0c17592
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexSerializable.scala
@@ -0,0 +1,20 @@
+package com.kakao.s2graph.core.storage.serde.vertex
+
+import com.kakao.s2graph.core.Vertex
+import com.kakao.s2graph.core.storage.{SKeyValue, Serializable}
+import org.apache.hadoop.hbase.util.Bytes
+
+
+case class VertexSerializable(vertex: Vertex) extends Serializable[Vertex] {
+
+ val cf = Serializable.vertexCf
+
+ override def toKeyValues: Seq[SKeyValue] = {
+ val row = vertex.id.bytes
+ val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield Bytes.toBytes(k) -> v.bytes
+ val belongsTo = vertex.belongLabelIds.map { labelId => Bytes.toBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] }
+ (base ++ belongsTo).map { case (qualifier, value) =>
+ SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, vertex.ts)
+ } toSeq
+ }
+}