You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/06/11 15:00:53 UTC
incubator-s2graph git commit: [S2GRAPH-81]: Separate Serializable's
toKeyValues into 3, toRowKey, toQualifier,
toValue split toKeyValues into toRowKey, toQualifier, toValue,
so buildRequest only use toRowKey, toQualifier.
Repository: incubator-s2graph
Updated Branches:
refs/heads/master 56829adc7 -> 0d1854450
[S2GRAPH-81]: Separate Serializable's toKeyValues into 3, toRowKey, toQualifier, toValue
split toKeyValues into toRowKey, toQualifier, toValue, so buildRequest only use toRowKey, toQualifier.
JIRA:
[S2GRAPH-81] https://issues.apache.org/jira/browse/S2GRAPH-81
Pull Request:
Closes #52
Authors:
DOYUNG YOON: steamshon@apache.org
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/0d185445
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/0d185445
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/0d185445
Branch: refs/heads/master
Commit: 0d185445056555593b524b02f689aecb9d68906a
Parents: 56829ad
Author: DO YUNG YOON <st...@apache.org>
Authored: Sat Jun 11 23:58:51 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Sat Jun 11 23:58:51 2016 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../core/storage/StorageSerializable.scala | 18 +-
.../core/storage/hbase/AsynchbaseStorage.scala | 19 +-
.../tall/IndexEdgeDeserializable.scala | 171 ++++++++-------
.../indexedge/tall/IndexEdgeSerializable.scala | 33 ++-
.../wide/IndexEdgeDeserializable.scala | 214 ++++++++++---------
.../indexedge/wide/IndexEdgeSerializable.scala | 56 +++--
.../tall/SnapshotEdgeSerializable.scala | 18 +-
.../wide/SnapshotEdgeSerializable.scala | 22 +-
.../serde/vertex/VertexSerializable.scala | 12 +-
10 files changed, 290 insertions(+), 275 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 56f14a0..7bcc7ef 100644
--- a/CHANGES
+++ b/CHANGES
@@ -135,6 +135,8 @@ Release 0.12.1 - unreleased
S2GRAPH-75: Use an embedded database as the default metadata storage.
(Contributed by Jong Wook Kim<jo...@nyu.edu>, committed by DOYUNG YOON)
+ S2GRAPH-81: Separate Serializable's toKeyValues into 3, toRowKey, toQualifier, toValue. (Committed by DOYUNG YOON).
+
TEST
S2GRAPH-21: Change PostProcessBenchmarkSpec not to store and fetch test data from storage. (Committed by DOYUNG YOON).
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala
index b6435e4..b7326f5 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala
@@ -56,5 +56,21 @@ object StorageSerializable {
}
trait StorageSerializable[E] {
- def toKeyValues: Seq[SKeyValue]
+ val cf = Serializable.edgeCf
+
+ val table: Array[Byte]
+ val ts: Long
+
+ def toRowKey: Array[Byte]
+ def toQualifier: Array[Byte]
+ def toValue: Array[Byte]
+
+ def toKeyValues: Seq[SKeyValue] = {
+ val row = toRowKey
+ val qualifier = toQualifier
+ val value = toValue
+ val kv = SKeyValue(table, row, cf, qualifier, value, ts)
+
+ Seq(kv)
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 66a1be4..4bd222f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -181,18 +181,17 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
val label = queryParam.label
val edge = toRequestEdge(queryRequest)
- val kv = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
+ val serializer = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
val snapshotEdge = edge.toSnapshotEdge
- snapshotEdgeSerializer(snapshotEdge).toKeyValues.head
- // new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, kv.qualifier)
+ snapshotEdgeSerializer(snapshotEdge)
} else {
- val indexedEdgeOpt = edge.edgesWithIndex.find(e => e.labelIndexSeq == queryParam.labelOrderSeq)
- assert(indexedEdgeOpt.isDefined)
-
- val indexedEdge = indexedEdgeOpt.get
- indexEdgeSerializer(indexedEdge).toKeyValues.head
+ val indexEdge = IndexEdge(edge.srcVertex, edge.tgtVertex, edge.labelWithDir,
+ edge.op, edge.version, queryParam.labelOrderSeq, edge.propsWithTs)
+ indexEdgeSerializer(indexEdge)
}
+ val (rowKey, qualifier) = (serializer.toRowKey, serializer.toQualifier)
+
val (minTs, maxTs) = queryParam.duration.getOrElse((0L, Long.MaxValue))
label.schemaVersion match {
@@ -246,8 +245,8 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
scanner
case _ =>
val get =
- if (queryParam.tgtVertexInnerIdOpt.isDefined) new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, kv.qualifier)
- else new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf)
+ if (queryParam.tgtVertexInnerIdOpt.isDefined) new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf, qualifier)
+ else new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf)
get.maxVersions(1)
get.setFailfast(true)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
index e80a805..e6265f7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -27,8 +27,6 @@ import org.apache.s2graph.core.types._
import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex}
import scala.collection.immutable
-import scala.collection.immutable
-
class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] {
import StorageDeserializable._
@@ -76,88 +74,87 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
(Array.empty[(Byte, InnerValLike)], 0)
}
- override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
- _kvs: Seq[T],
- schemaVer: String,
- cacheElementOpt: Option[IndexEdge]): IndexEdge = {
-
- assert(_kvs.size == 1)
-
- val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-
- val kv = kvs.head
- val version = kv.timestamp
- // logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}")
- var pos = 0
- val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, schemaVer)
- pos += srcIdLen
- val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
- pos += 4
- val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
- pos += 1
-
- val op = kv.row(pos)
- pos += 1
-
- if (pos == kv.row.length) {
- // degree
- // val degreeVal = Bytes.toLong(kv.value)
- val degreeVal = bytesToLongFunc(kv.value, 0)
- val ts = kv.timestamp
- val props = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(ts, ts, schemaVer),
- LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(degreeVal, ts, schemaVer))
- val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer))
- IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props)
- } else {
- // not degree edge
-
-
- val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, schemaVer)
- pos = endAt
- val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length) {
- (HBaseType.defaultTgtVertexId, 0)
- } else {
- TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, schemaVer)
- }
-
-
- val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs]
- val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
-
- /** process indexProps */
- for {
- (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
- } {
- if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version)
- else allProps += seq -> InnerValLikeWithTs(v, version)
- }
-
- /** process props */
- if (op == GraphUtil.operations("incrementCount")) {
- // val countVal = Bytes.toLong(kv.value)
- val countVal = bytesToLongFunc(kv.value, 0)
- allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
- } else {
- val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer)
- props.foreach { case (k, v) =>
- allProps += (k -> InnerValLikeWithTs(v, version))
- }
- }
- val _mergedProps = allProps.result()
- val mergedProps =
- if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
- else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer))
-
- /** process tgtVertexId */
- val tgtVertexId =
- mergedProps.get(LabelMeta.toSeq) match {
- case None => tgtVertexIdRaw
- case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
- }
-
-
- IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps)
-
- }
- }
- }
+ override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
+ _kvs: Seq[T],
+ schemaVer: String,
+ cacheElementOpt: Option[IndexEdge]): IndexEdge = {
+
+ assert(_kvs.size == 1)
+
+ val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+
+ val kv = kvs.head
+ val version = kv.timestamp
+ // logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}")
+ var pos = 0
+ val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, schemaVer)
+ pos += srcIdLen
+ val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
+ pos += 4
+ val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
+ pos += 1
+
+ val op = kv.row(pos)
+ pos += 1
+
+ if (pos == kv.row.length) {
+ // degree
+ // val degreeVal = Bytes.toLong(kv.value)
+ val degreeVal = bytesToLongFunc(kv.value, 0)
+ val ts = kv.timestamp
+ val props = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(ts, ts, schemaVer),
+ LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(degreeVal, ts, schemaVer))
+ val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer))
+ IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props)
+ } else {
+ // not degree edge
+
+
+ val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, schemaVer)
+ pos = endAt
+ val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length) {
+ (HBaseType.defaultTgtVertexId, 0)
+ } else {
+ TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, schemaVer)
+ }
+
+ val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs]
+ val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
+
+ /** process indexProps */
+ for {
+ (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
+ } {
+ if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version)
+ else allProps += seq -> InnerValLikeWithTs(v, version)
+ }
+
+ /** process props */
+ if (op == GraphUtil.operations("incrementCount")) {
+ // val countVal = Bytes.toLong(kv.value)
+ val countVal = bytesToLongFunc(kv.value, 0)
+ allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
+ } else {
+ val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer)
+ props.foreach { case (k, v) =>
+ allProps += (k -> InnerValLikeWithTs(v, version))
+ }
+ }
+ val _mergedProps = allProps.result()
+ val mergedProps =
+ if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
+ else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer))
+
+ /** process tgtVertexId */
+ val tgtVertexId =
+ mergedProps.get(LabelMeta.toSeq) match {
+ case None => tgtVertexIdRaw
+ case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
+ }
+
+
+ IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps)
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
index d00877e..f17e41c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
@@ -29,14 +29,13 @@ import org.apache.s2graph.core.{GraphUtil, IndexEdge}
class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] {
import StorageSerializable._
- val label = indexEdge.label
- val table = label.hbaseTableName.getBytes()
- val cf = Serializable.edgeCf
+ override val ts = indexEdge.version
+ override val table = indexEdge.label.hbaseTableName.getBytes()
- val idxPropsMap = indexEdge.orders.toMap
- val idxPropsBytes = propsToBytes(indexEdge.orders)
+ def idxPropsMap = indexEdge.orders.toMap
+ def idxPropsBytes = propsToBytes(indexEdge.orders)
- override def toKeyValues: Seq[SKeyValue] = {
+ override def toRowKey: Array[Byte] = {
val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
val labelWithDirBytes = indexEdge.labelWithDir.bytes
val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
@@ -53,20 +52,16 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge
}
/** TODO search usage of op byte. if there is no, then remove opByte */
- val rowBytes = Bytes.add(row, Array.fill(1)(GraphUtil.defaultOpByte), qualifier)
- // val qualifierBytes = Array.fill(1)(indexEdge.op)
- val qualifierBytes = Array.empty[Byte]
+ Bytes.add(row, Array.fill(1)(GraphUtil.defaultOpByte), qualifier)
+ }
- val value =
- if (indexEdge.degreeEdge)
- Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong)
- else if (indexEdge.op == GraphUtil.operations("incrementCount"))
- Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong)
- else propsToKeyValues(indexEdge.metas.toSeq)
+ override def toQualifier: Array[Byte] = Array.empty[Byte]
- val kv = SKeyValue(table, rowBytes, cf, qualifierBytes, value, indexEdge.version)
+ override def toValue: Array[Byte] =
+ if (indexEdge.degreeEdge)
+ Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong)
+ else if (indexEdge.op == GraphUtil.operations("incrementCount"))
+ Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong)
+ else propsToKeyValues(indexEdge.metas.toSeq)
- // logger.debug(s"[Ser]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}")
- Seq(kv)
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
index b1b933f..5a8fa42 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -28,109 +28,113 @@ import scala.collection.immutable
import scala.collection.immutable
+import scala.collection.immutable
+
class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] {
- import StorageDeserializable._
-
- type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int)
- type ValueRaw = (Array[(Byte, InnerValLike)], Int)
-
- private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = {
- // val degree = Bytes.toLong(kv.value)
- val degree = bytesToLongFunc(kv.value, 0)
- val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version))
- val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version))
- (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
- }
-
- private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = {
- var qualifierLen = 0
- var pos = 0
- val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = {
- val (props, endAt) = bytesToProps(kv.qualifier, pos, version)
- pos = endAt
- qualifierLen += endAt
- val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) {
- (HBaseType.defaultTgtVertexId, 0)
- } else {
- TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version)
- }
- qualifierLen += tgtVertexIdLen
- (props, endAt, tgtVertexId, tgtVertexIdLen)
- }
- val (op, opLen) =
- if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0)
- else (kv.qualifier(qualifierLen), 1)
-
- qualifierLen += opLen
-
- (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen)
- }
-
- private def parseValue(kv: SKeyValue, version: String): ValueRaw = {
- val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version)
- (props, endAt)
- }
-
- private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = {
- (Array.empty[(Byte, InnerValLike)], 0)
- }
-
- override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
- _kvs: Seq[T],
- schemaVer: String,
- cacheElementOpt: Option[IndexEdge]): IndexEdge = {
- assert(_kvs.size == 1)
-
- val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-
- val kv = kvs.head
- val version = kv.timestamp
-
- val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e =>
- (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0)
- }.getOrElse(parseRow(kv, schemaVer))
-
- val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) =
- if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, schemaVer)
- else parseQualifier(kv, schemaVer)
-
- val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs]
- val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
-
- /** process indexProps */
- for {
- (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
- } {
- if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version)
- else allProps += seq -> InnerValLikeWithTs(v, version)
- }
-
- /** process props */
- if (op == GraphUtil.operations("incrementCount")) {
- // val countVal = Bytes.toLong(kv.value)
- val countVal = bytesToLongFunc(kv.value, 0)
- allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
- } else if (kv.qualifier.isEmpty) {
- val countVal = bytesToLongFunc(kv.value, 0)
- allProps += (LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
- } else {
- val (props, _) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer)
- props.foreach { case (k, v) => allProps += (k -> InnerValLikeWithTs(v, version)) }
- }
-
- val _mergedProps = allProps.result()
- val mergedProps =
- if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
- else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer))
-
- /** process tgtVertexId */
- val tgtVertexId =
- mergedProps.get(LabelMeta.toSeq) match {
- case None => tgtVertexIdRaw
- case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
- }
-
- IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps)
-
- }
- }
+
+
+ import StorageDeserializable._
+
+ type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int)
+ type ValueRaw = (Array[(Byte, InnerValLike)], Int)
+
+ private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = {
+ // val degree = Bytes.toLong(kv.value)
+ val degree = bytesToLongFunc(kv.value, 0)
+ val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version))
+ val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version))
+ (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
+ }
+
+ private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = {
+ var qualifierLen = 0
+ var pos = 0
+ val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = {
+ val (props, endAt) = bytesToProps(kv.qualifier, pos, version)
+ pos = endAt
+ qualifierLen += endAt
+ val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) {
+ (HBaseType.defaultTgtVertexId, 0)
+ } else {
+ TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version)
+ }
+ qualifierLen += tgtVertexIdLen
+ (props, endAt, tgtVertexId, tgtVertexIdLen)
+ }
+ val (op, opLen) =
+ if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0)
+ else (kv.qualifier(qualifierLen), 1)
+
+ qualifierLen += opLen
+
+ (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen)
+ }
+
+ private def parseValue(kv: SKeyValue, version: String): ValueRaw = {
+ val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version)
+ (props, endAt)
+ }
+
+ private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = {
+ (Array.empty[(Byte, InnerValLike)], 0)
+ }
+
+ override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
+ _kvs: Seq[T],
+ schemaVer: String,
+ cacheElementOpt: Option[IndexEdge]): IndexEdge = {
+ assert(_kvs.size == 1)
+
+ val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+
+ val kv = kvs.head
+ val version = kv.timestamp
+
+ val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e =>
+ (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0)
+ }.getOrElse(parseRow(kv, schemaVer))
+
+ val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) =
+ if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, schemaVer)
+ else parseQualifier(kv, schemaVer)
+
+ val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs]
+ val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
+
+ /** process indexProps */
+ for {
+ (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
+ } {
+ if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version)
+ else allProps += seq -> InnerValLikeWithTs(v, version)
+ }
+
+ /** process props */
+ if (op == GraphUtil.operations("incrementCount")) {
+ // val countVal = Bytes.toLong(kv.value)
+ val countVal = bytesToLongFunc(kv.value, 0)
+ allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
+ } else if (kv.qualifier.isEmpty) {
+ val countVal = bytesToLongFunc(kv.value, 0)
+ allProps += (LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
+ } else {
+ val (props, _) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer)
+ props.foreach { case (k, v) => allProps += (k -> InnerValLikeWithTs(v, version)) }
+ }
+
+ val _mergedProps = allProps.result()
+ val mergedProps =
+ if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
+ else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer))
+
+ /** process tgtVertexId */
+ val tgtVertexId =
+ mergedProps.get(LabelMeta.toSeq) match {
+ case None => tgtVertexIdRaw
+ case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
+ }
+
+ IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps)
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
index 49e95b4..83d4338 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
@@ -28,44 +28,40 @@ import org.apache.s2graph.core.{GraphUtil, IndexEdge}
class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] {
import StorageSerializable._
- val label = indexEdge.label
- val table = label.hbaseTableName.getBytes()
- val cf = Serializable.edgeCf
+ override val ts = indexEdge.version
+ override val table = indexEdge.label.hbaseTableName.getBytes()
- val idxPropsMap = indexEdge.orders.toMap
- val idxPropsBytes = propsToBytes(indexEdge.orders)
+ def idxPropsMap = indexEdge.orders.toMap
+ def idxPropsBytes = propsToBytes(indexEdge.orders)
- override def toKeyValues: Seq[SKeyValue] = {
+ override def toRowKey: Array[Byte] = {
val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
val labelWithDirBytes = indexEdge.labelWithDir.bytes
val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
- val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
- // logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}")
+ Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
+ }
+
+ override def toQualifier: Array[Byte] = {
val tgtIdBytes = VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes
- val qualifier =
- if (indexEdge.degreeEdge) Array.empty[Byte]
- else {
- if (indexEdge.op == GraphUtil.operations("incrementCount")) {
- Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op))
- } else {
- idxPropsMap.get(LabelMeta.toSeq) match {
- case None => Bytes.add(idxPropsBytes, tgtIdBytes)
- case Some(vId) => idxPropsBytes
- }
+ if (indexEdge.degreeEdge) Array.empty[Byte]
+ else {
+ if (indexEdge.op == GraphUtil.operations("incrementCount")) {
+ Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op))
+ } else {
+ idxPropsMap.get(LabelMeta.toSeq) match {
+ case None => Bytes.add(idxPropsBytes, tgtIdBytes)
+ case Some(vId) => idxPropsBytes
}
}
-
-
- val value =
- if (indexEdge.degreeEdge)
- Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong)
- else if (indexEdge.op == GraphUtil.operations("incrementCount"))
- Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong)
- else propsToKeyValues(indexEdge.metas.toSeq)
-
- val kv = SKeyValue(table, row, cf, qualifier, value, indexEdge.version)
-
- Seq(kv)
+ }
}
+
+ override def toValue: Array[Byte] =
+ if (indexEdge.degreeEdge)
+ Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong)
+ else if (indexEdge.op == GraphUtil.operations("incrementCount"))
+ Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong)
+ else propsToKeyValues(indexEdge.metas.toSeq)
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
index 716a6b9..4f7c17b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
@@ -29,9 +29,8 @@ import org.apache.s2graph.core.types.SourceAndTargetVertexIdPair
class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] {
import StorageSerializable._
- val label = snapshotEdge.label
- val table = label.hbaseTableName.getBytes()
- val cf = Serializable.edgeCf
+ override val ts = snapshotEdge.version
+ override val table = snapshotEdge.label.hbaseTableName.getBytes()
def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = {
val byte = (((statusCode << 4) | op).toByte)
@@ -40,16 +39,18 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[
def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op),
propsToKeyValuesWithTs(snapshotEdge.props.toList))
- override def toKeyValues: Seq[SKeyValue] = {
+ override def toRowKey: Array[Byte] = {
val srcIdAndTgtIdBytes = SourceAndTargetVertexIdPair(snapshotEdge.srcVertex.innerId, snapshotEdge.tgtVertex.innerId).bytes
val labelWithDirBytes = snapshotEdge.labelWithDir.bytes
val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true)
- val row = Bytes.add(srcIdAndTgtIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
+ Bytes.add(srcIdAndTgtIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
+ }
- val qualifier = Array.empty[Byte]
+ override def toQualifier: Array[Byte] = Array.empty[Byte]
- val value = snapshotEdge.pendingEdgeOpt match {
+ override def toValue: Array[Byte] =
+ snapshotEdge.pendingEdgeOpt match {
case None => valueBytes()
case Some(pendingEdge) =>
val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op)
@@ -60,7 +61,4 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[
Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes))
}
- val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version)
- Seq(kv)
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
index 2eb2b1b..757ef1b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
@@ -34,9 +34,8 @@ import org.apache.s2graph.core.types.VertexId
class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] {
import StorageSerializable._
- val label = snapshotEdge.label
- val table = label.hbaseTableName.getBytes()
- val cf = Serializable.edgeCf
+ override val ts = snapshotEdge.version
+ override val table = snapshotEdge.label.hbaseTableName.getBytes()
def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = {
val byte = (((statusCode << 4) | op).toByte)
@@ -45,17 +44,20 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[
def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op),
propsToKeyValuesWithTs(snapshotEdge.props.toList))
- override def toKeyValues: Seq[SKeyValue] = {
+
+ override def toRowKey: Array[Byte] = {
val srcIdBytes = VertexId.toSourceVertexId(snapshotEdge.srcVertex.id).bytes
val labelWithDirBytes = snapshotEdge.labelWithDir.bytes
val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true)
- val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
- val tgtIdBytes = VertexId.toTargetVertexId(snapshotEdge.tgtVertex.id).bytes
+ Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
+ }
- val qualifier = tgtIdBytes
+ override def toQualifier: Array[Byte] =
+ VertexId.toTargetVertexId(snapshotEdge.tgtVertex.id).bytes
- val value = snapshotEdge.pendingEdgeOpt match {
+ override def toValue: Array[Byte] =
+ snapshotEdge.pendingEdgeOpt match {
case None => valueBytes()
case Some(pendingEdge) =>
val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op)
@@ -64,7 +66,5 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[
val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get)
Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes))
}
- val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version)
- Seq(kv)
- }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
index a74031a..6bb162c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
@@ -25,10 +25,18 @@ import org.apache.s2graph.core.storage.{SKeyValue, Serializable}
case class VertexSerializable(vertex: Vertex) extends Serializable[Vertex] {
- val cf = Serializable.vertexCf
+ override val table = vertex.hbaseTableName.getBytes
+ override val ts = vertex.ts
+ override val cf = Serializable.vertexCf
+ override def toRowKey: Array[Byte] = vertex.id.bytes
+
+ override def toQualifier: Array[Byte] = Array.empty[Byte]
+ override def toValue: Array[Byte] = Array.empty[Byte]
+
+ /** vertex override toKeyValues since vertex expect to produce multiple sKeyValues */
override def toKeyValues: Seq[SKeyValue] = {
- val row = vertex.id.bytes
+ val row = toRowKey
val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield Bytes.toBytes(k) -> v.bytes
val belongsTo = vertex.belongLabelIds.map { labelId => Bytes.toBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] }
(base ++ belongsTo).map { case (qualifier, value) =>